diff options
100 files changed, 3183 insertions, 1373 deletions
diff --git a/.gitignore b/.gitignore index c2f47953b..ee87c9d50 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ .mypy_cache .test .testrepository +.nox .tox .venv .coverage diff --git a/.zuul.yaml b/.zuul.yaml index 66a3de1e3..420ac11ad 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -40,44 +40,47 @@ zuul_ansible_version: 6 - job: - name: zuul-tox + name: zuul-nox description: | Zuul unit tests with ZooKeeper running - parent: tox + parent: nox nodeset: ubuntu-jammy - pre-run: playbooks/zuul-tox/pre.yaml - post-run: playbooks/zuul-tox/post-system-logs.yaml + pre-run: playbooks/zuul-nox/pre.yaml + post-run: playbooks/zuul-nox/post-system-logs.yaml vars: - tox_environment: + nox_environment: ZUUL_ZK_CA: /opt/zookeeper/ca/certs/cacert.pem ZUUL_ZK_CERT: /opt/zookeeper/ca/certs/client.pem ZUUL_ZK_KEY: /opt/zookeeper/ca/keys/clientkey.pem ZUUL_TEST_ROOT: /tmp/zuul-test YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs" + CI: "1" test_setup_environment: ZUUL_TEST_ROOT: /tmp/zuul-test YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs" - job: - name: zuul-tox-remote - parent: tox + name: zuul-nox-remote + parent: nox nodeset: ubuntu-jammy timeout: 2700 # 45 minutes - pre-run: playbooks/zuul-tox/pre.yaml - post-run: playbooks/zuul-tox/post-system-logs.yaml + pre-run: playbooks/zuul-nox/pre.yaml + post-run: playbooks/zuul-nox/post-system-logs.yaml vars: - tox_envlist: remote - tox_environment: + nox_session: remote + nox_environment: ZUUL_ZK_CA: /opt/zookeeper/ca/certs/cacert.pem ZUUL_ZK_CERT: /opt/zookeeper/ca/certs/client.pem ZUUL_ZK_KEY: /opt/zookeeper/ca/keys/clientkey.pem ZUUL_SSH_KEY: /home/zuul/.ssh/id_rsa ZUUL_REMOTE_IPV4: "{{ nodepool.interface_ip }}" ZUUL_REMOTE_KEEP: "true" + CI: "1" - job: + # Zuul cient uses this job so we can't just delete it yet. name: zuul-tox-zuul-client - parent: zuul-tox + parent: zuul-nox description: | Test that Zuul and zuul-client work together. required-projects: @@ -88,28 +91,42 @@ tox_envlist: zuul_client - job: - name: zuul-tox-py311 - parent: zuul-tox + name: zuul-nox-zuul-client + parent: zuul-nox + description: | + Test that Zuul and zuul-client work together. + required-projects: + - zuul/zuul + - zuul/zuul-client + vars: + zuul_work_dir: "{{ zuul.projects['opendev.org/zuul/zuul'].src_dir }}" + nox_session: zuul_client + +- job: + name: zuul-nox-py311 + parent: zuul-nox timeout: 7200 # 120 minutes vars: - tox_envlist: py311 + nox_keyword: tests + nox_force_python: "3.11" python_version: "3.11" - job: - name: zuul-tox-py38 - parent: zuul-tox + name: zuul-nox-py38 + parent: zuul-nox timeout: 7200 # 120 minutes vars: - tox_envlist: py38 + nox_keyword: tests + nox_force_python: "3.8" python_version: "3.8" nodeset: ubuntu-focal - job: - name: zuul-tox-py311-multi-scheduler - parent: zuul-tox-py311 + name: zuul-nox-py311-multi-scheduler + parent: zuul-nox-py311 voting: false vars: - tox_environment: + nox_environment: ZUUL_SCHEDULER_COUNT: 2 - job: @@ -278,17 +295,18 @@ vars: node_version: 16 release_python: python3 + ensure_tox_version: "<4" check: jobs: - zuul-build-image - - zuul-tox-docs - - tox-linters: + - zuul-nox-docs + - nox-linters: vars: - tox_install_bindep: false + nox_install_bindep: false nodeset: ubuntu-jammy - - zuul-tox-py38 - - zuul-tox-py311 - - zuul-tox-py311-multi-scheduler + - zuul-nox-py38 + - zuul-nox-py311 + - zuul-nox-py311-multi-scheduler - zuul-build-dashboard-openstack-whitelabel - zuul-build-dashboard-software-factory - zuul-build-dashboard-opendev @@ -303,22 +321,22 @@ - web/.* nodeset: ubuntu-jammy - zuul-stream-functional-6 - - zuul-tox-remote + - zuul-nox-remote - zuul-quick-start: requires: nodepool-container-image dependencies: zuul-build-image - - zuul-tox-zuul-client + - zuul-nox-zuul-client - zuul-build-python-release gate: jobs: - zuul-upload-image - - zuul-tox-docs - - tox-linters: + - zuul-nox-docs + - nox-linters: vars: - tox_install_bindep: false + nox_install_bindep: false nodeset: ubuntu-jammy - - zuul-tox-py38 - - zuul-tox-py311 + - zuul-nox-py38 + - zuul-nox-py311 - zuul-build-dashboard - nodejs-run-lint: vars: @@ -331,16 +349,16 @@ - web/.* nodeset: ubuntu-jammy - zuul-stream-functional-6 - - zuul-tox-remote + - zuul-nox-remote - zuul-quick-start: requires: nodepool-container-image dependencies: zuul-upload-image - - zuul-tox-zuul-client + - zuul-nox-zuul-client - zuul-build-python-release promote: jobs: - zuul-promote-image - - zuul-promote-docs + - zuul-promote-nox-docs - opendev-promote-python: vars: download_artifact_job: zuul-build-python-release @@ -351,7 +369,7 @@ release: jobs: - zuul-release-python - - zuul-publish-tox-docs + - zuul-publish-nox-docs - upload-docker-image: secrets: name: docker_credentials diff --git a/Dockerfile b/Dockerfile index 506ed7462..df326bd8a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,8 +27,9 @@ ARG REACT_APP_ZUUL_API # Optional flag to enable React Service Worker. (set to true to enable) ARG REACT_APP_ENABLE_SERVICE_WORKER # Kubectl/Openshift version/sha -ARG OPENSHIFT_URL=https://github.com/openshift/origin/releases/download/v3.11.0/openshift-origin-client-tools-v3.11.0-0cbc58b-linux-64bit.tar.gz -ARG OPENSHIFT_SHA=4b0f07428ba854174c58d2e38287e5402964c9a9355f6c359d1242efd0990da3 +ARG OPENSHIFT_URL=https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/4.11.20/openshift-client-linux-4.11.20.tar.gz +ARG OPENSHIFT_SHA=74f252c812932425ca19636b2be168df8fe57b114af6b114283975e67d987d11 +ARG PBR_VERSION= COPY . /tmp/src COPY --from=js-builder /tmp/src/build /tmp/src/zuul/web/static @@ -46,7 +47,16 @@ RUN /output/install-from-bindep \ && curl -L $OPENSHIFT_URL -o /tmp/openshift-install/openshift-client.tgz \ && cd /tmp/openshift-install/ \ && echo $OPENSHIFT_SHA /tmp/openshift-install/openshift-client.tgz | sha256sum --check \ - && tar xvfz openshift-client.tgz --strip-components=1 -C /tmp/openshift-install + && tar xvfz openshift-client.tgz -C /tmp/openshift-install + + +FROM docker.io/library/golang:1.19-bullseye AS skopeo-builder +RUN apt-get update \ + && apt-get install -y git build-essential libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config \ + && git clone https://github.com/containers/skopeo.git \ + && cd skopeo && git checkout v1.9.3 \ + && make bin/skopeo \ + && cp bin/skopeo /tmp/skopeo FROM docker.io/opendevorg/python-base:3.11-bullseye as zuul ENV DEBIAN_FRONTEND=noninteractive @@ -66,11 +76,16 @@ CMD ["/usr/local/bin/zuul"] FROM zuul as zuul-executor ENV DEBIAN_FRONTEND=noninteractive COPY --from=builder /usr/local/lib/zuul/ /usr/local/lib/zuul -COPY --from=builder /tmp/openshift-install/kubectl /usr/local/bin/kubectl COPY --from=builder /tmp/openshift-install/oc /usr/local/bin/oc +# The oc and kubectl binaries are large and have the same hash. +# Copy them only once and use a symlink to save space. +RUN ln -s /usr/local/bin/oc /usr/local/bin/kubectl +# See note above about this workaround. These are the runtime +# dependencies, this should just install skopeo when we upgrade. +COPY --from=skopeo-builder /tmp/skopeo /usr/local/bin/skopeo RUN apt-get update \ - && apt-get install -y skopeo \ + && apt-get install -y libdevmapper1.02.1 libgpgme11 \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* diff --git a/doc/source/config/job.rst b/doc/source/config/job.rst index 435d68e7e..f9e7b7b71 100644 --- a/doc/source/config/job.rst +++ b/doc/source/config/job.rst @@ -1046,6 +1046,14 @@ Here is an example of two job definitions: value is used to determine if the job should run. This is a :ref:`regular expression <regex>` or list of regular expressions. + .. warning:: + + File filters will be ignored for refs that don't have any + files. This will be the case for merge commits (e.g. in a post + pipeline) or empty commits created with + ``git commit --allow-empty`` (which can be used in order to + run all jobs). + .. attr:: irrelevant-files This is a negative complement of **files**. It indicates that @@ -1055,6 +1063,14 @@ Here is an example of two job definitions: are in the docs directory. A :ref:`regular expression <regex>` or list of regular expressions. + .. warning:: + + File filters will be ignored for refs that don't have any + files. This will be the case for merge commits (e.g. in a post + pipeline) or empty commits created with + ``git commit --allow-empty`` (which can be used in order to + run all jobs). + .. attr:: match-on-config-updates :default: true diff --git a/doc/source/config/pipeline.rst b/doc/source/config/pipeline.rst index f4d7cce69..41bedfbf2 100644 --- a/doc/source/config/pipeline.rst +++ b/doc/source/config/pipeline.rst @@ -266,6 +266,17 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of type of the connection will dictate which options are available. See :ref:`drivers`. + .. attr:: allow-other-connections + :default: true + + If this is set to `false` then any change enqueued into the + pipeline (whether it is enqueued to run jobs or merely as a + dependency) must be from one of the connections specified in the + pipeline configuration (this includes any trigger, reporter, or + source requirement). When used in conjuctions with + :attr:`pipeline.require`, this can ensure that pipeline + requirements are exhaustive. + .. attr:: supercedes The name of a pipeline, or a list of names, that this pipeline diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index c6d8fedbd..f78b3f0a0 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -100,3 +100,22 @@ Version 10 :Prior Zuul version: 6.4.0 :Description: Renames admin_rules to authz_rules in unparsed abide. Affects schedulers and web. + +Version 11 +---------- + +:Prior Zuul version: 8.0.1 +:Description: Adds merge_modes to branch cache. Affects schedulers and web. + +Version 12 +---------- +:Prior Zuul version: 8.0.1 +:Description: Adds job_versions and build_versions to BuildSet. + Affects schedulers. + +Version 13 +---------- +:Prior Zuul version: 8.2.0 +:Description: Stores only the necessary event info as part of a queue item + instead of the full trigger event. + Affects schedulers. diff --git a/doc/source/governance.rst b/doc/source/governance.rst index 7393481cd..7bcdac44f 100644 --- a/doc/source/governance.rst +++ b/doc/source/governance.rst @@ -99,7 +99,7 @@ The Project Lead is elected to a term of one year. The election process shall be a Condorcet election and the candidates shall be self-nominated from among the existing Maintainers. -The Project Lead is James E. Blair (term expires 2023-01-13). +The Project Lead is James E. Blair (term expires 2024-01-16). Zuul-Jobs Maintainers --------------------- diff --git a/doc/source/job-content.rst b/doc/source/job-content.rst index 75044cf1c..d6bb07683 100644 --- a/doc/source/job-content.rst +++ b/doc/source/job-content.rst @@ -612,7 +612,6 @@ of item. The patchset identifier for the change. If a change is revised, this will have a different value. - .. var:: resources :type: dict @@ -706,14 +705,18 @@ are available: The commit or pull request message of the change base64 encoded. Use the `b64decode` filter in ansible when working with it. - .. code-block:: yaml + .. warning:: This variable is deprecated and will be removed in + a future version. Use :var:`zuul.change_message` + instead. + + .. var:: change_message - - hosts: all - tasks: - - name: Dump commit message - copy: - content: "{{ zuul.message | b64decode }}" - dest: "{{ zuul.executor.log_root }}/commit-message.txt" + The commit or pull request message of the change. When Zuul + runs Ansible, this variable is tagged with the ``!unsafe`` YAML + tag so that Ansible will not interpolate values into it. Note, + however, that the `inventory.yaml` file placed in the build's + workspace for debugging and inspection purposes does not inclued + the ``!unsafe`` tag. Branch Items diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 7f99c7f7f..f40bee445 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -444,6 +444,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Indicates if the executor is paused. 1 means paused else 0. + .. stat:: pct_used_hdd + :type: gauge + + The used disk on this executor, as a percentage multiplied by 100. + .. stat:: pct_used_ram :type: gauge @@ -711,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The size of the current connection event queue. + .. stat:: run_handler + :type: timer + + A timer metric reporting the time taken for one scheduler run + handler iteration. + .. stat:: time_query :type: timer diff --git a/noxfile.py b/noxfile.py new file mode 100644 index 000000000..30058cef7 --- /dev/null +++ b/noxfile.py @@ -0,0 +1,128 @@ +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import multiprocessing +import os + +import nox + + +nox.options.error_on_external_run = True +nox.options.reuse_existing_virtualenvs = True +nox.options.sessions = ["tests-3", "linters"] + + +def set_env(session, var, default): + session.env[var] = os.environ.get(var, default) + + +def set_standard_env_vars(session): + set_env(session, 'OS_LOG_CAPTURE', '1') + set_env(session, 'OS_STDERR_CAPTURE', '1') + set_env(session, 'OS_STDOUT_CAPTURE', '1') + set_env(session, 'OS_TEST_TIMEOUT', '360') + session.env['PYTHONWARNINGS'] = ','.join([ + 'always::DeprecationWarning:zuul.driver.sql.sqlconnection', + 'always::DeprecationWarning:tests.base', + 'always::DeprecationWarning:tests.unit.test_database', + 'always::DeprecationWarning:zuul.driver.sql.alembic.env', + 'always::DeprecationWarning:zuul.driver.sql.alembic.script', + ]) + # Set PYTHONTRACEMALLOC to a value greater than 0 in the calling env + # to get tracebacks of that depth for ResourceWarnings. Disabled by + # default as this consumes more resources and is slow. + set_env(session, 'PYTHONTRACEMALLOC', '0') + + +@nox.session(python='3') +def bindep(session): + set_standard_env_vars(session) + session.install('bindep') + session.run('bindep', 'test') + + +@nox.session(python='3') +def cover(session): + set_standard_env_vars(session) + session.env['PYTHON'] = 'coverage run --source zuul --parallel-mode' + session.install('-r', 'requirements.txt', + '-r', 'test-requirements.txt') + session.install('-e', '.') + session.run('stestr', 'run') + session.run('coverage', 'combine') + session.run('coverage', 'html', '-d', 'cover') + session.run('coverage', 'xml', '-o', 'cover/coverage.xml') + + +@nox.session(python='3') +def docs(session): + set_standard_env_vars(session) + session.install('-r', 'doc/requirements.txt', + '-r', 'test-requirements.txt') + session.install('-e', '.') + session.run('sphinx-build', '-E', '-W', '-d', 'doc/build/doctrees', + '-b', 'html', 'doc/source/', 'doc/build/html') + + +@nox.session(python='3') +def linters(session): + set_standard_env_vars(session) + session.install('flake8', 'openapi-spec-validator') + session.run('flake8') + session.run('openapi-spec-validator', 'web/public/openapi.yaml') + + +@nox.session(python='3') +def tests(session): + set_standard_env_vars(session) + session.install('-r', 'requirements.txt', + '-r', 'test-requirements.txt') + session.install('-e', '.') + session.run_always('tools/yarn-build.sh', external=True) + session.run_always('zuul-manage-ansible', '-v') + procs = max(int(multiprocessing.cpu_count() - 1), 1) + session.run('stestr', 'run', '--slowest', f'--concurrency={procs}', + *session.posargs) + + +@nox.session(python='3') +def remote(session): + set_standard_env_vars(session) + session.install('-r', 'requirements.txt', + '-r', 'test-requirements.txt') + session.install('-e', '.') + session.run_always('zuul-manage-ansible', '-v') + session.run('stestr', 'run', '--test-path', './tests/remote') + + +@nox.session(python='3') +def venv(session): + set_standard_env_vars(session) + session.install('-r', 'requirements.txt', + '-r', 'test-requirements.txt') + session.install('-e', '.') + session.run(*session.posargs) + + +@nox.session(python='3') +def zuul_client(session): + set_standard_env_vars(session) + session.install('zuul-client', + '-r', 'test-requirements.txt', + '-r', 'requirements.txt') + session.install('-e', '.') + session.run_always('zuul-manage-ansible', '-v') + session.run( + 'stestr', 'run', '--concurrency=1', + '--test-path', './tests/zuul_client') diff --git a/playbooks/zuul-tox/post-system-logs.yaml b/playbooks/zuul-nox/post-system-logs.yaml index 437411914..437411914 100644 --- a/playbooks/zuul-tox/post-system-logs.yaml +++ b/playbooks/zuul-nox/post-system-logs.yaml diff --git a/playbooks/zuul-tox/pre.yaml b/playbooks/zuul-nox/pre.yaml index c8c1c6500..c8c1c6500 100644 --- a/playbooks/zuul-tox/pre.yaml +++ b/playbooks/zuul-nox/pre.yaml diff --git a/releasenotes/notes/change_message-18207e18b5dfffd3.yaml b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml new file mode 100644 index 000000000..1fd005684 --- /dev/null +++ b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml @@ -0,0 +1,12 @@ +--- +features: + - | + The change message (commit message, or pull request message + depending on the driver) is now available in plain text form + annotated with the Ansible `!unsafe` tag as + :var:`zuul.change_message`. +deprecations: + - | + The base64 encoded version of the change message available as + :var:`zuul.message` is deprecated and will be removed in a future + version of Zuul. Use :var:`zuul.change_message` instead. diff --git a/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml b/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml new file mode 100644 index 000000000..9dd238faf --- /dev/null +++ b/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml @@ -0,0 +1,12 @@ +--- +upgrade: + - | + Fixes `tenant.untrusted-projects.<project>.include-branches` being lower + priority than `tenant.untrusted-projects.<project>.exclude-branches` to + match the documentation and expected user behavior. + + This only affects projects that are using both `include-branches` and + `exclude-branches` at the same time. Now, `include-branches` has priority + over `exclude-branches` for any branches that match both. Practically + speaking, this means that `exclude-branches` is ignored if + `include-branches` is set. diff --git a/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml b/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml new file mode 100644 index 000000000..052d5b255 --- /dev/null +++ b/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml @@ -0,0 +1,29 @@ +--- +features: + - | + A new pipeline attribute, + :attr:`pipeline.allow-other-connections`, has been added + to ensure that only changes from connections which + are mentioned in the pipeline configuration (such as triggers, + reporters, or pipeline requirements) are enqueued. +security: + - | + Non-live items are now subject to pipeline requirements for + independent pipelines. + + Previously, an optimization for independent pipelines skipped + checking that a change met the pipeline requirements. If an + independent pipeline is intended only to run reviewed code, this + could allow running unreviewed code by updating dependent changes. + + Now both non-live and live items are subject to pipeline + requirements in all pipeline managers. + + - | + The new `allow-other-connections` pipeline configuration option + may now be used to ensure that only changes from connections which + are mentioned in the pipeline configuration (such as triggers, + reporters, or pipeline requirements) are enqueued. This allows + the construction of a pipeline where, for example, code review + requirements are strictly enforced, even for dependencies which + are not normally directly enqueued. diff --git a/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml b/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml new file mode 100644 index 000000000..8db16102d --- /dev/null +++ b/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Zuul now stores the pause and resume events for a build together with + their timestamp and reports them to the SQL database and via MQTT. diff --git a/requirements.txt b/requirements.txt index a4aeb7f09..e12e9cf72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,10 +17,10 @@ tzlocal<3.0 # https://github.com/agronholm/apscheduler/discussions/570 PrettyTable>=0.6,<0.8 babel>=1.0 netaddr -kazoo>=2.8.0 -sqlalchemy +kazoo>=2.9.0 +sqlalchemy>=2.0.0 alembic -cryptography>=1.6 +cryptography>=39.0.0 cachecontrol<0.12.7 cachetools pyjwt>=2.0.0 @@ -60,3 +60,10 @@ allow_redefinition = True files = zuul ignore_missing_imports = True python_version = 3.6 + +[flake8] +# These are ignored intentionally in zuul projects; +# please don't submit patches that solely correct them or enable them. +ignore = E124,E125,E129,E252,E402,E741,H,W503,W504 +show-source = True +exclude = .venv,.tox,.nox,dist,doc,build,*.egg,node_modules diff --git a/tests/base.py b/tests/base.py index 731d6c3cc..fd927a92c 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1227,6 +1227,7 @@ class GerritWebServer(object): def stop(self): self.httpd.shutdown() self.thread.join() + self.httpd.server_close() class FakeGerritPoller(gerritconnection.GerritPoller): @@ -3625,10 +3626,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): self.log.debug('No running builds to release') return - self.log.debug("Releasing build %s (%s)" % (regex, len(builds))) + self.log.debug("Releasing build %s %s (%s)" % ( + regex, change, len(builds))) for build in builds: - if (not regex or re.match(regex, build.name) and - not change or build.change == change): + if ((not regex or re.match(regex, build.name)) and + (not change or build.change == change)): self.log.debug("Releasing build %s" % (build.parameters['zuul']['build'])) build.release() @@ -4096,6 +4098,7 @@ class WebProxyFixture(fixtures.Fixture): def _cleanup(self): self.httpd.shutdown() self.thread.join() + self.httpd.server_close() class ZuulWebFixture(fixtures.Fixture): @@ -4345,6 +4348,11 @@ class BaseTestCase(testtools.TestCase): handler.setFormatter(formatter) logger = logging.getLogger() + # It is possible that a stderr log handler is inserted before our + # addHandler below. If that happens we will emit all logs to stderr + # even when we don't want to. Error here to make it clear there is + # a problem as early as possible as it is easy to overlook. + self.assertEqual(logger.handlers, []) logger.setLevel(logging.DEBUG) logger.addHandler(handler) @@ -5173,6 +5181,11 @@ class ZuulTestCase(BaseTestCase): self.assertIsNotNone(build.start_time) self.assertIsNotNone(build.end_time) + def assertNoPipelineExceptions(self): + for tenant in self.scheds.first.sched.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + self.assertEqual(0, pipeline._exception_count) + def assertFinalState(self): self.log.debug("Assert final state") # Make sure no jobs are running @@ -5199,6 +5212,7 @@ class ZuulTestCase(BaseTestCase): for pipeline in tenant.layout.pipelines.values(): if isinstance(pipeline.manager, ipm): self.assertEqual(len(pipeline.queues), 0) + self.assertNoPipelineExceptions() def shutdown(self): self.log.debug("Shutting down after tests") @@ -5625,6 +5639,7 @@ class ZuulTestCase(BaseTestCase): time.sleep(0.1) def refreshPipelines(self, sched): + ctx = None for tenant in sched.abide.tenants.values(): with tenant_read_lock(self.zk_client, tenant.name): for pipeline in tenant.layout.pipelines.values(): diff --git a/tests/fakegithub.py b/tests/fakegithub.py index 4255d5022..725c083e2 100644 --- a/tests/fakegithub.py +++ b/tests/fakegithub.py @@ -238,6 +238,11 @@ class FakeRepository(object): # List of branch protection rules self._branch_protection_rules = defaultdict(FakeBranchProtectionRule) + self._repodata = { + 'allow_merge_commit': True, + 'allow_squash_merge': True, + 'allow_rebase_merge': True, + } # fail the next commit requests with 404 self.fail_not_found = 0 @@ -350,6 +355,8 @@ class FakeRepository(object): return self.get_url_collaborators(request) if entity == 'commits': return self.get_url_commits(request, params=params) + if entity == '': + return self.get_url_repo() else: return None @@ -444,6 +451,9 @@ class FakeRepository(object): } return FakeResponse(data) + def get_url_repo(self): + return FakeResponse(self._repodata) + def pull_requests(self, state=None, sort=None, direction=None): # sort and direction are unused currently, but present to match # real world call signatures. @@ -752,7 +762,12 @@ class FakeGithubSession(object): return FakeResponse(check_run.as_dict(), 200) def get_repo(self, request, params=None): - org, project, request = request.split('/', 2) + parts = request.split('/', 2) + if len(parts) == 2: + org, project = parts + request = '' + else: + org, project, request = parts project_name = '{}/{}'.format(org, project) repo = self.client.repo_from_project(project_name) diff --git a/tests/fakegitlab.py b/tests/fakegitlab.py index 59b85ca96..e4a3e1ac8 100644 --- a/tests/fakegitlab.py +++ b/tests/fakegitlab.py @@ -276,3 +276,4 @@ class GitlabWebServer(object): def stop(self): self.httpd.shutdown() self.thread.join() + self.httpd.server_close() diff --git a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml index 834c3cbe8..cea1ffe8c 100644 --- a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml +++ b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml @@ -4,3 +4,7 @@ copy: content: "{{ zuul.message | b64decode }}" dest: "{{ zuul.executor.log_root }}/commit-message.txt" + - name: Dump commit message + copy: + content: "{{ zuul.change_message }}" + dest: "{{ zuul.executor.log_root }}/change-message.txt" diff --git a/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml b/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml new file mode 100644 index 000000000..f679dceae --- /dev/null +++ b/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml @@ -0,0 +1,2 @@ +- hosts: all + tasks: [] diff --git a/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml b/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml new file mode 100644 index 000000000..494cd3cc7 --- /dev/null +++ b/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml @@ -0,0 +1,41 @@ +- pipeline: + name: trusted-check + manager: independent + allow-other-connections: false + require: + gerrit: + approval: + - Code-Review: 2 + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + nodeset: + nodes: + - label: ubuntu-xenial + name: controller + +- job: + name: check-job + +- project: + name: org/project + trusted-check: + jobs: + - check-job + +- project: + name: gh/project + trusted-check: + jobs: + - check-job diff --git a/tests/fixtures/config/requirements/trusted-check/git/gh_project/README b/tests/fixtures/config/requirements/trusted-check/git/gh_project/README new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/fixtures/config/requirements/trusted-check/git/gh_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/requirements/trusted-check/git/org_project/README b/tests/fixtures/config/requirements/trusted-check/git/org_project/README new file mode 100644 index 000000000..9daeafb98 --- /dev/null +++ b/tests/fixtures/config/requirements/trusted-check/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/requirements/trusted-check/main.yaml b/tests/fixtures/config/requirements/trusted-check/main.yaml new file mode 100644 index 000000000..c8deb36c1 --- /dev/null +++ b/tests/fixtures/config/requirements/trusted-check/main.yaml @@ -0,0 +1,11 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project + github: + untrusted-projects: + - gh/project diff --git a/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml b/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml new file mode 100644 index 000000000..6d455802a --- /dev/null +++ b/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml @@ -0,0 +1,16 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project1: + exclude-branches: + - foo + - bar + # Include branches has higher priority than exclude branches + include-branches: + - foo + - bar + - org/project2 diff --git a/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml b/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml index 3fcc43ce6..975b9713e 100644 --- a/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml +++ b/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml @@ -4,7 +4,7 @@ require: gerrit: approval: - - Verified: -1 + - email: for-check@example.com trigger: gerrit: - event: patchset-created @@ -24,7 +24,7 @@ require: gerrit: approval: - - Verified: 1 + - email: for-gate@example.com trigger: gerrit: - event: comment-added diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml index 3824c5c2c..e7e8fc465 100644 --- a/tests/fixtures/layouts/deps-by-topic.yaml +++ b/tests/fixtures/layouts/deps-by-topic.yaml @@ -47,24 +47,27 @@ run: playbooks/run.yaml - job: - name: test-job + name: check-job + +- job: + name: gate-job - project: name: org/project1 queue: integrated check: jobs: - - test-job + - check-job gate: jobs: - - test-job + - gate-job - project: name: org/project2 queue: integrated check: jobs: - - test-job + - check-job gate: jobs: - - test-job + - gate-job diff --git a/tests/fixtures/layouts/github-merge-mode.yaml b/tests/fixtures/layouts/github-merge-mode.yaml new file mode 100644 index 000000000..139db886a --- /dev/null +++ b/tests/fixtures/layouts/github-merge-mode.yaml @@ -0,0 +1,21 @@ +- pipeline: + name: check + manager: independent + trigger: + github: + - event: pull_request_review + action: submitted + state: approve + start: + github: {} + success: + github: {} + failure: + github: {} + +- project: + name: org/project + merge-mode: rebase + gate: + jobs: + - noop diff --git a/tests/fixtures/layouts/two-check.yaml b/tests/fixtures/layouts/two-check.yaml new file mode 100644 index 000000000..bc3e59818 --- /dev/null +++ b/tests/fixtures/layouts/two-check.yaml @@ -0,0 +1,47 @@ +- pipeline: + name: check1 + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- pipeline: + name: check2 + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + nodeset: + nodes: + - label: ubuntu-xenial + name: controller + +- job: + name: check-job + run: playbooks/check.yaml + +- project: + name: org/project + check1: + jobs: + - check-job + check2: + jobs: + - check-job diff --git a/tests/fixtures/zuul-gerrit-github.conf b/tests/fixtures/zuul-gerrit-github.conf index 0d03bacc2..8114159f7 100644 --- a/tests/fixtures/zuul-gerrit-github.conf +++ b/tests/fixtures/zuul-gerrit-github.conf @@ -5,6 +5,7 @@ server=127.0.0.1 [scheduler] tenant_config=main.yaml +relative_priority=true [merger] git_dir=/tmp/zuul-test/merger-git diff --git a/tests/make_playbooks.py b/tests/make_playbooks.py index 93c37bc81..cb7a98096 100755 --- a/tests/make_playbooks.py +++ b/tests/make_playbooks.py @@ -40,7 +40,8 @@ def handle_repo(path): config_path = os.path.join(path, fn) break try: - config = yaml.safe_load(open(config_path)) + with open(config_path) as f: + config = yaml.safe_load(f) except Exception: print(" Has yaml errors") return diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 74d2cedf0..a3f9dda33 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1") self.assertHistory([ - dict(name="test-job", result="SUCCESS", changes="2,1 1,1"), - dict(name="test-job", result="SUCCESS", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), ], ordered=False) A.addPatchset() @@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertHistory([ # Original check run - dict(name="test-job", result="SUCCESS", changes="2,1 1,1"), - dict(name="test-job", result="SUCCESS", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), # Second check run - dict(name="test-job", result="SUCCESS", changes="2,1 1,2"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,2"), ], ordered=False) def test_deps_by_topic_multi_tenant(self): @@ -2332,6 +2332,121 @@ class TestGerritCircularDependencies(ZuulTestCase): dict(name="project6-job-t2", result="SUCCESS", changes="1,1 2,1"), ], ordered=False) + def test_dependency_refresh(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates the typical workflow where a developer only + # knows the change id of changes one at a time. + # The first change: + A = self.fake_gerrit.addFakeChange("org/project", "master", "A") + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Now that it has been uploaded, upload the second change and + # point it at the first. + # B -> A + B = self.fake_gerrit.addFakeChange("org/project", "master", "B") + B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + B.subject, A.data["url"] + ) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Now that the second change is known, update the first change + # B <-> A + A.addPatchset() + A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + A.subject, B.data["url"] + ) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name="project-job", result="ABORTED", changes="1,1"), + dict(name="project-job", result="ABORTED", changes="1,1 2,1"), + dict(name="project-job", result="SUCCESS", changes="1,2 2,1"), + dict(name="project-job", result="SUCCESS", changes="2,1 1,2"), + ], ordered=False) + + @simple_layout('layouts/deps-by-topic.yaml') + def test_dependency_refresh_by_topic_check(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates the typical workflow where a developer + # uploads changes one at a time. + # The first change: + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Now that it has been uploaded, upload the second change + # in the same topic. + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name="check-job", result="ABORTED", changes="1,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), + ], ordered=False) + + @simple_layout('layouts/deps-by-topic.yaml') + def test_dependency_refresh_by_topic_gate(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates a workflow where a developer adds a change to + # a cycle already in gate. + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + A.addApproval("Approved", 1) + self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) + self.waitUntilSettled() + + # Add a new change to the cycle. + C = self.fake_gerrit.addFakeChange('org/project1', "master", "C", + topic='test-topic') + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + # At the end of this process, the gate jobs should be aborted + # because the new dpendency showed up. + self.assertEqual(A.data["status"], "NEW") + self.assertEqual(B.data["status"], "NEW") + self.assertEqual(C.data["status"], "NEW") + self.assertHistory([ + dict(name="gate-job", result="ABORTED", changes="1,1 2,1"), + dict(name="gate-job", result="ABORTED", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"), + ], ordered=False) + class TestGithubCircularDependencies(ZuulTestCase): config_file = "zuul-gerrit-github.conf" @@ -2524,3 +2639,48 @@ class TestGithubCircularDependencies(ZuulTestCase): B.comments[-1])) self.assertFalse(re.search('Change .*? is needed', B.comments[-1])) + + def test_dependency_refresh(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates the typical workflow where a developer only + # knows the PR id of changes one at a time. + # The first change: + A = self.fake_github.openFakePullRequest("gh/project", "master", "A") + self.fake_github.emitEvent(A.getPullRequestOpenedEvent()) + self.waitUntilSettled() + + # Now that it has been uploaded, upload the second change and + # point it at the first. + # B -> A + B = self.fake_github.openFakePullRequest("gh/project", "master", "B") + B.body = "{}\n\nDepends-On: {}\n".format( + B.subject, A.url + ) + self.fake_github.emitEvent(B.getPullRequestOpenedEvent()) + self.waitUntilSettled() + + # Now that the second change is known, update the first change + # B <-> A + A.body = "{}\n\nDepends-On: {}\n".format( + A.subject, B.url + ) + + self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.subject)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name="project-job", result="ABORTED", + changes=f"{A.number},{A.head_sha}"), + dict(name="project-job", result="SUCCESS", + changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"), + dict(name="project-job", result="SUCCESS", + changes=f"{B.number},{B.head_sha} {A.number},{A.head_sha}"), + ], ordered=False) diff --git a/tests/unit/test_configloader.py b/tests/unit/test_configloader.py index d76eece12..81a05fc37 100644 --- a/tests/unit/test_configloader.py +++ b/tests/unit/test_configloader.py @@ -611,6 +611,12 @@ class TestTenantExcludeBranches(TestTenantIncludeBranches): # Same test results as include-branches +class TestTenantExcludeIncludeBranches(TestTenantIncludeBranches): + tenant_config_file = 'config/tenant-parser/exclude-include-branches.yaml' + + # Same test results as include-branches + + class TestTenantExcludeAll(TenantParserTestCase): tenant_config_file = 'config/tenant-parser/exclude-all.yaml' diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index bae4ff258..26a99215e 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -65,7 +65,7 @@ class TestSQLConnectionMysql(ZuulTestCase): def _sql_tables_created(self, connection_name): connection = self.scheds.first.connections.connections[connection_name] - insp = sa.engine.reflection.Inspector(connection.engine) + insp = sa.inspect(connection.engine) table_prefix = connection.table_prefix self.assertEqual(self.expected_table_prefix, table_prefix) @@ -82,7 +82,7 @@ class TestSQLConnectionMysql(ZuulTestCase): def _sql_indexes_created(self, connection_name): connection = self.scheds.first.connections.connections[connection_name] - insp = sa.engine.reflection.Inspector(connection.engine) + insp = sa.inspect(connection.engine) table_prefix = connection.table_prefix self.assertEqual(self.expected_table_prefix, table_prefix) @@ -127,99 +127,134 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() - self.assertEqual(4, len(buildsets)) + self.assertEqual(5, len(buildsets)) buildset0 = buildsets[0] buildset1 = buildsets[1] buildset2 = buildsets[2] buildset3 = buildsets[3] - - self.assertEqual('check', buildset0['pipeline']) - self.assertEqual('org/project', buildset0['project']) - self.assertEqual(1, buildset0['change']) - self.assertEqual('1', buildset0['patchset']) - self.assertEqual('SUCCESS', buildset0['result']) - self.assertEqual('Build succeeded.', buildset0['message']) - self.assertEqual('tenant-one', buildset0['tenant']) + buildset4 = buildsets[4] + + self.assertEqual('check', buildset0.pipeline) + self.assertEqual('org/project', buildset0.project) + self.assertEqual(1, buildset0.change) + self.assertEqual('1', buildset0.patchset) + self.assertEqual('SUCCESS', buildset0.result) + self.assertEqual('Build succeeded.', buildset0.message) + self.assertEqual('tenant-one', buildset0.tenant) self.assertEqual( - 'https://review.example.com/%d' % buildset0['change'], - buildset0['ref_url']) - self.assertNotEqual(None, buildset0['event_id']) - self.assertNotEqual(None, buildset0['event_timestamp']) + 'https://review.example.com/%d' % buildset0.change, + buildset0.ref_url) + self.assertNotEqual(None, buildset0.event_id) + self.assertNotEqual(None, buildset0.event_timestamp) buildset0_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() # Check the first result, which should be the project-merge job self.assertEqual( - 'project-merge', buildset0_builds[0]['job_name']) - self.assertEqual("SUCCESS", buildset0_builds[0]['result']) - self.assertEqual(None, buildset0_builds[0]['log_url']) - self.assertEqual('check', buildset1['pipeline']) - self.assertEqual('master', buildset1['branch']) - self.assertEqual('org/project', buildset1['project']) - self.assertEqual(2, buildset1['change']) - self.assertEqual('1', buildset1['patchset']) - self.assertEqual('FAILURE', buildset1['result']) - self.assertEqual('Build failed.', buildset1['message']) + 'project-merge', buildset0_builds[0].job_name) + self.assertEqual("SUCCESS", buildset0_builds[0].result) + self.assertEqual(None, buildset0_builds[0].log_url) + self.assertEqual('check', buildset1.pipeline) + self.assertEqual('master', buildset1.branch) + self.assertEqual('org/project', buildset1.project) + self.assertEqual(2, buildset1.change) + self.assertEqual('1', buildset1.patchset) + self.assertEqual('FAILURE', buildset1.result) + self.assertEqual('Build failed.', buildset1.message) buildset1_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset1['id'] + buildset1.id ) ).fetchall() # Check the second result, which should be the project-test1 # job which failed self.assertEqual( - 'project-test1', buildset1_builds[1]['job_name']) - self.assertEqual("FAILURE", buildset1_builds[1]['result']) - self.assertEqual(None, buildset1_builds[1]['log_url']) + 'project-test1', buildset1_builds[1].job_name) + self.assertEqual("FAILURE", buildset1_builds[1].result) + self.assertEqual(None, buildset1_builds[1].log_url) buildset2_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset2['id'] + buildset2.id ) ).fetchall() # Check the first result, which should be the project-publish # job self.assertEqual('project-publish', - buildset2_builds[0]['job_name']) - self.assertEqual("SUCCESS", buildset2_builds[0]['result']) + buildset2_builds[0].job_name) + self.assertEqual("SUCCESS", buildset2_builds[0].result) buildset3_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset3['id'] + buildset3.id ) ).fetchall() self.assertEqual( - 'project-test1', buildset3_builds[1]['job_name']) - self.assertEqual('NODE_FAILURE', buildset3_builds[1]['result']) - self.assertEqual(None, buildset3_builds[1]['log_url']) - self.assertIsNotNone(buildset3_builds[1]['start_time']) - self.assertIsNotNone(buildset3_builds[1]['end_time']) + 'project-test1', buildset3_builds[1].job_name) + self.assertEqual('NODE_FAILURE', buildset3_builds[1].result) + self.assertEqual(None, buildset3_builds[1].log_url) + self.assertIsNotNone(buildset3_builds[1].start_time) + self.assertIsNotNone(buildset3_builds[1].end_time) self.assertGreaterEqual( - buildset3_builds[1]['end_time'], - buildset3_builds[1]['start_time']) + buildset3_builds[1].end_time, + buildset3_builds[1].start_time) + + # Check the paused build result + buildset4_builds = conn.execute( + sa.sql.select( + reporter.connection.zuul_build_table + ).where( + reporter.connection.zuul_build_table.c.buildset_id == + buildset4.id + ).order_by(reporter.connection.zuul_build_table.c.id) + ).fetchall() + + paused_build_events = conn.execute( + sa.sql.select( + reporter.connection.zuul_build_event_table + ).where( + reporter.connection.zuul_build_event_table.c.build_id + == buildset4_builds[0].id + ) + ).fetchall() + + self.assertEqual(len(paused_build_events), 2) + pause_event = paused_build_events[0] + resume_event = paused_build_events[1] + self.assertEqual( + pause_event.event_type, "paused") + self.assertIsNotNone(pause_event.event_time) + self.assertIsNone(pause_event.description) + self.assertEqual( + resume_event.event_type, "resumed") + self.assertIsNotNone(resume_event.event_time) + self.assertIsNone(resume_event.description) + + self.assertGreater( + resume_event.event_time, pause_event.event_time) self.executor_server.hold_jobs_in_build = True @@ -264,6 +299,23 @@ class TestSQLConnectionMysql(ZuulTestCase): self.orderedRelease() self.waitUntilSettled() + # We are pausing a job within this test, so holding the jobs in + # build and releasing them in order becomes difficult as the + # paused job will either be paused or waiting on the child jobs + # to start. + # As we are not interested in the order the jobs are running but + # only on the results in the database, simply deactivate + # hold_jobs_in_build. + self.executor_server.hold_jobs_in_build = False + + # Add a paused build result + self.log.debug("Adding paused build result") + D = self.fake_gerrit.addFakeChange("org/project", "master", "D") + self.executor_server.returnData( + "project-merge", D, {"zuul": {"pause": True}}) + self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + check_results() def test_sql_results_retry_builds(self): @@ -281,51 +333,51 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() self.assertEqual(1, len(buildsets)) buildset0 = buildsets[0] - self.assertEqual('check', buildset0['pipeline']) - self.assertEqual('org/project', buildset0['project']) - self.assertEqual(1, buildset0['change']) - self.assertEqual('1', buildset0['patchset']) - self.assertEqual('SUCCESS', buildset0['result']) - self.assertEqual('Build succeeded.', buildset0['message']) - self.assertEqual('tenant-one', buildset0['tenant']) + self.assertEqual('check', buildset0.pipeline) + self.assertEqual('org/project', buildset0.project) + self.assertEqual(1, buildset0.change) + self.assertEqual('1', buildset0.patchset) + self.assertEqual('SUCCESS', buildset0.result) + self.assertEqual('Build succeeded.', buildset0.message) + self.assertEqual('tenant-one', buildset0.tenant) self.assertEqual( - 'https://review.example.com/%d' % buildset0['change'], - buildset0['ref_url']) + 'https://review.example.com/%d' % buildset0.change, + buildset0.ref_url) buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() # Check the retry results - self.assertEqual('project-merge', buildset0_builds[0]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[0]['result']) - self.assertTrue(buildset0_builds[0]['final']) - - self.assertEqual('project-test1', buildset0_builds[1]['job_name']) - self.assertEqual('RETRY', buildset0_builds[1]['result']) - self.assertFalse(buildset0_builds[1]['final']) - self.assertEqual('project-test2', buildset0_builds[2]['job_name']) - self.assertEqual('RETRY', buildset0_builds[2]['result']) - self.assertFalse(buildset0_builds[2]['final']) - - self.assertEqual('project-test1', buildset0_builds[3]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[3]['result']) - self.assertTrue(buildset0_builds[3]['final']) - self.assertEqual('project-test2', buildset0_builds[4]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[4]['result']) - self.assertTrue(buildset0_builds[4]['final']) + self.assertEqual('project-merge', buildset0_builds[0].job_name) + self.assertEqual('SUCCESS', buildset0_builds[0].result) + self.assertTrue(buildset0_builds[0].final) + + self.assertEqual('project-test1', buildset0_builds[1].job_name) + self.assertEqual('RETRY', buildset0_builds[1].result) + self.assertFalse(buildset0_builds[1].final) + self.assertEqual('project-test2', buildset0_builds[2].job_name) + self.assertEqual('RETRY', buildset0_builds[2].result) + self.assertFalse(buildset0_builds[2].final) + + self.assertEqual('project-test1', buildset0_builds[3].job_name) + self.assertEqual('SUCCESS', buildset0_builds[3].result) + self.assertTrue(buildset0_builds[3].final) + self.assertEqual('project-test2', buildset0_builds[4].job_name) + self.assertEqual('SUCCESS', buildset0_builds[4].result) + self.assertTrue(buildset0_builds[4].final) self.executor_server.hold_jobs_in_build = True @@ -378,7 +430,7 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() @@ -387,10 +439,10 @@ class TestSQLConnectionMysql(ZuulTestCase): buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() @@ -436,7 +488,7 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() @@ -445,10 +497,10 @@ class TestSQLConnectionMysql(ZuulTestCase): buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() @@ -743,6 +795,37 @@ class TestMQTTConnection(ZuulTestCase): self.assertIn('uuid', mqtt_payload) self.assertEquals(dependent_test_job['dependencies'], ['test']) + def test_mqtt_paused_job(self): + + A = self.fake_gerrit.addFakeChange("org/project", "master", "A") + # Let the job being paused via the executor + self.executor_server.returnData("test", A, {"zuul": {"pause": True}}) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + success_event = self.mqtt_messages.pop() + + mqtt_payload = success_event["msg"] + self.assertEquals(mqtt_payload["project"], "org/project") + builds = mqtt_payload["buildset"]["builds"] + paused_job = [b for b in builds if b["job_name"] == "test"][0] + self.assertEquals(len(paused_job["events"]), 2) + + pause_event = paused_job["events"][0] + self.assertEquals(pause_event["event_type"], "paused") + self.assertGreater( + pause_event["event_time"], paused_job["start_time"]) + self.assertLess(pause_event["event_time"], paused_job["end_time"]) + + resume_event = paused_job["events"][1] + self.assertEquals(resume_event["event_type"], "resumed") + self.assertGreater( + resume_event["event_time"], paused_job["start_time"]) + self.assertLess(resume_event["event_time"], paused_job["end_time"]) + + self.assertGreater( + resume_event["event_time"], pause_event["event_time"]) + def test_mqtt_invalid_topic(self): in_repo_conf = textwrap.dedent( """ diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py index eafd601e9..d93e088d8 100644 --- a/tests/unit/test_event_queues.py +++ b/tests/unit/test_event_queues.py @@ -637,6 +637,36 @@ class TestEventWatchers(EventQueueBaseTestCase): result_queues["other-tenant"]["post"].put(result_event) self._wait_for_event(event) + def test_pipeline_event_watcher_recreate(self): + event = threading.Event() + watcher = event_queues.EventWatcher(self.zk_client, event.set) + + management_queues = ( + event_queues.PipelineManagementEventQueue.createRegistry( + self.zk_client + ) + ) + self.assertFalse(event.is_set()) + + management_queues["tenant"]["check"].put(model.ReconfigureEvent()) + self._wait_for_event(event) + + # Wait for the watch to be fully established to avoid race + # conditions, since the event watcher will also ensure that the + # trigger and result event paths exist. + for _ in iterate_timeout(5, "all watches to be established"): + if watcher.watched_pipelines: + break + + self.zk_client.client.delete( + event_queues.PIPELINE_NAME_ROOT.format( + tenant="tenant", pipeline="check"), recursive=True) + event.clear() + + management_queues["tenant"]["check"].initialize() + management_queues["tenant"]["check"].put(model.ReconfigureEvent()) + self._wait_for_event(event) + class TestConnectionEventQueue(EventQueueBaseTestCase): diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index 2e3057af6..2a63d5ef8 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -957,3 +957,48 @@ class TestGerritConnection(ZuulTestCase): self.assertEqual(B.queried, 2) self.assertEqual(A.data['status'], 'MERGED') self.assertEqual(B.data['status'], 'MERGED') + + +class TestGerritUnicodeRefs(ZuulTestCase): + config_file = 'zuul-gerrit-web.conf' + tenant_config_file = 'config/single-tenant/main.yaml' + + upload_pack_data = (b'014452944ee370db5c87691e62e0f9079b6281319b4e HEAD' + b'\x00multi_ack thin-pack side-band side-band-64k ' + b'ofs-delta shallow deepen-since deepen-not ' + b'deepen-relative no-progress include-tag ' + b'multi_ack_detailed allow-tip-sha1-in-want ' + b'allow-reachable-sha1-in-want ' + b'symref=HEAD:refs/heads/faster filter ' + b'object-format=sha1 agent=git/2.37.1.gl1\n' + b'003d5f42665d737b3fd4ec22ca0209e6191859f09fd6 ' + b'refs/for/faster\n' + b'004952944ee370db5c87691e62e0f9079b6281319b4e ' + b'refs/heads/foo/\xf0\x9f\x94\xa5\xf0\x9f\x94\xa5' + b'\xf0\x9f\x94\xa5\n' + b'003f52944ee370db5c87691e62e0f9079b6281319b4e ' + b'refs/heads/faster\n0000').decode("utf-8") + + def test_mb_unicode_refs(self): + gerrit_config = { + 'user': 'gerrit', + 'server': 'localhost', + } + driver = GerritDriver() + gerrit = GerritConnection(driver, 'review_gerrit', gerrit_config) + + def _uploadPack(project): + return self.upload_pack_data + + self.patch(gerrit, '_uploadPack', _uploadPack) + + project = gerrit.source.getProject('org/project') + refs = gerrit.getInfoRefs(project) + + self.assertEqual(refs, + {'refs/for/faster': + '5f42665d737b3fd4ec22ca0209e6191859f09fd6', + 'refs/heads/foo/🔥🔥🔥': + '52944ee370db5c87691e62e0f9079b6281319b4e', + 'refs/heads/faster': + '52944ee370db5c87691e62e0f9079b6281319b4e'}) diff --git a/tests/unit/test_git_driver.py b/tests/unit/test_git_driver.py index 06e2ac7c8..95fca30d3 100644 --- a/tests/unit/test_git_driver.py +++ b/tests/unit/test_git_driver.py @@ -62,7 +62,8 @@ class TestGitDriver(ZuulTestCase): # Update zuul.yaml to force a tenant reconfiguration path = os.path.join(self.upstream_root, 'common-config', 'zuul.yaml') - config = yaml.safe_load(open(path, 'r').read()) + with open(path, 'r') as f: + config = yaml.safe_load(f) change = { 'name': 'org/project', 'check': { diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index 47cc6c624..fb46aa7d1 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -18,8 +18,10 @@ import re from testtools.matchers import MatchesRegex, Not, StartsWith import urllib import socket +import threading import time import textwrap +from concurrent.futures import ThreadPoolExecutor from unittest import mock, skip import git @@ -32,10 +34,11 @@ from zuul.zk.layout import LayoutState from zuul.lib import strings from zuul.merger.merger import Repo from zuul.model import MergeRequest, EnqueueEvent, DequeueEvent +from zuul.zk.change_cache import ChangeKey from tests.base import (AnsibleZuulTestCase, BaseTestCase, ZuulGithubAppTestCase, ZuulTestCase, - simple_layout, random_sha1) + simple_layout, random_sha1, iterate_timeout) from tests.base import ZuulWebFixture EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) @@ -1463,6 +1466,65 @@ class TestGithubDriver(ZuulTestCase): self.assertEqual('SUCCESS', self.getJobFromHistory('project-test2').result) + @simple_layout('layouts/github-merge-mode.yaml', driver='github') + def test_merge_method_syntax_check(self): + """ + Tests that the merge mode gets forwarded to the reporter and the + PR was rebased. + """ + github = self.fake_github.getGithubClient() + repo = github.repo_from_project('org/project') + repo._repodata['allow_rebase_merge'] = False + self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() + + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + loading_errors = tenant.layout.loading_errors + self.assertEquals( + len(tenant.layout.loading_errors), 1, + "An error should have been stored") + self.assertIn( + "rebase not supported", + str(loading_errors[0].error)) + + @simple_layout("layouts/basic-github.yaml", driver="github") + def test_concurrent_get_change(self): + """ + Test that getting a change concurrently returns the same + object from the cache. + """ + conn = self.scheds.first.sched.connections.connections["github"] + + # Create a new change object and remove it from the cache so + # the concurrent call will try to create a new change object. + A = self.fake_github.openFakePullRequest("org/project", "master", "A") + change_key = ChangeKey(conn.connection_name, "org/project", + "PullRequest", str(A.number), str(A.head_sha)) + change = conn.getChange(change_key, refresh=True) + conn._change_cache.delete(change_key) + + # Acquire the update lock so the concurrent get task needs to + # wait for the lock to be release. + lock = conn._change_update_lock.setdefault(change_key, + threading.Lock()) + lock.acquire() + try: + executor = ThreadPoolExecutor(max_workers=1) + task = executor.submit(conn.getChange, change_key, refresh=True) + for _ in iterate_timeout(5, "task to be running"): + if task.running(): + break + # Add the change back so the waiting task can get the + # change from the cache. + conn._change_cache.set(change_key, change) + finally: + lock.release() + executor.shutdown() + + other_change = task.result() + self.assertIsNotNone(other_change.cache_stat) + self.assertIs(change, other_change) + class TestMultiGithubDriver(ZuulTestCase): config_file = 'zuul-multi-github.conf' diff --git a/tests/unit/test_inventory.py b/tests/unit/test_inventory.py index 5b30a139f..40f858624 100644 --- a/tests/unit/test_inventory.py +++ b/tests/unit/test_inventory.py @@ -57,7 +57,8 @@ class TestInventoryBase(ZuulTestCase): build = self.getBuildByName(name) inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml') - inventory = yaml.safe_load(open(inv_path, 'r')) + with open(inv_path, 'r') as f: + inventory = yaml.safe_load(f) return inventory def _get_setup_inventory(self, name): @@ -65,7 +66,9 @@ class TestInventoryBase(ZuulTestCase): build = self.getBuildByName(name) setup_inv_path = build.jobdir.setup_playbook.inventory - return yaml.ansible_unsafe_load(open(setup_inv_path, 'r')) + with open(setup_inv_path, 'r') as f: + inventory = yaml.ansible_unsafe_load(f) + return inventory def runJob(self, name): self.hold_jobs_in_queue = False @@ -409,26 +412,34 @@ class TestAnsibleInventory(AnsibleZuulTestCase): build = self.history[0] inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml') - inventory = yaml.safe_load(open(inv_path, 'r')) + with open(inv_path, 'r') as f: + inventory = yaml.safe_load(f) zv_path = os.path.join(build.jobdir.root, 'ansible', 'zuul_vars.yaml') - zv = yaml.safe_load(open(zv_path, 'r')) + with open(zv_path, 'r') as f: + zv = yaml.ansible_unsafe_load(f) # TODO(corvus): zuul vars aren't really stored here anymore; # rework these tests to examine them separately. inventory['all']['vars'] = {'zuul': zv['zuul']} + # The deprecated base64 version decoded_message = base64.b64decode( inventory['all']['vars']['zuul']['message']).decode('utf-8') self.assertEqual(decoded_message, expected_message) - obtained_message = self._get_file(self.history[0], 'work/logs/commit-message.txt') + self.assertEqual(obtained_message, expected_message) + # The new !unsafe version + decoded_message = inventory['all']['vars']['zuul']['change_message'] + self.assertEqual(decoded_message, expected_message) + obtained_message = self._get_file(self.history[0], + 'work/logs/change-message.txt') self.assertEqual(obtained_message, expected_message) def test_jinja2_message_brackets(self): - self._jinja2_message("This message has {{ jinja2 }} in it ") + self._jinja2_message("This message has {{ ansible_host }} in it ") def test_jinja2_message_raw(self): self._jinja2_message("This message has {% raw %} in {% endraw %} it ") diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py index fd78726ab..f907cb8b4 100644 --- a/tests/unit/test_merger_repo.py +++ b/tests/unit/test_merger_repo.py @@ -16,6 +16,8 @@ import datetime import logging import os +import shutil +from unittest import mock import git import testtools @@ -161,6 +163,64 @@ class TestMergerRepo(ZuulTestCase): work_repo.reset() work_repo.checkout("foobar") + def test_rebase_merge_conflict_abort(self): + """Test that a failed rebase is properly aborted and related + directories are cleaned up.""" + parent_path = os.path.join(self.upstream_root, 'org/project1') + parent_repo = git.Repo(parent_path) + parent_repo.create_head("feature") + + files = {"test.txt": "master"} + self.create_commit("org/project1", files=files, head="master", + message="Add master file") + + files = {"test.txt": "feature"} + self.create_commit("org/project1", files=files, head="feature", + message="Add feature file") + + work_repo = Repo(parent_path, self.workspace_root, + "none@example.org", "User Name", "0", "0") + + item = {"ref": "refs/heads/feature"} + # We expect the rebase to fail because of a conflict, but the + # rebase will be aborted. + with testtools.ExpectedException(git.exc.GitCommandError): + work_repo.rebaseMerge(item, "master") + + # Assert that the failed rebase doesn't leave any temporary + # directories behind. + self.assertFalse( + os.path.exists(f"{work_repo.local_path}/.git/rebase-merge")) + self.assertFalse( + os.path.exists(f"{work_repo.local_path}/.git/rebase-apply")) + + def test_rebase_merge_conflict_reset_cleanup(self): + """Test temporary directories of a failed rebase merge are + removed on repo reset.""" + parent_path = os.path.join(self.upstream_root, 'org/project1') + parent_repo = git.Repo(parent_path) + parent_repo.create_head("feature") + + files = {"master.txt": "master"} + self.create_commit("org/project1", files=files, head="master", + message="Add master file") + + files = {"feature.txt": "feature"} + self.create_commit("org/project1", files=files, head="feature", + message="Add feature file") + + work_repo = Repo(parent_path, self.workspace_root, + "none@example.org", "User Name", "0", "0") + + # Simulate leftovers from a failed rebase + os.mkdir(f"{work_repo.local_path}/.git/rebase-merge") + os.mkdir(f"{work_repo.local_path}/.git/rebase-apply") + + # Resetting the repo should clean up any leaked directories + work_repo.reset() + item = {"ref": "refs/heads/feature"} + work_repo.rebaseMerge(item, "master") + def test_set_refs(self): parent_path = os.path.join(self.upstream_root, 'org/project1') remote_sha = self.create_commit('org/project1') @@ -678,6 +738,59 @@ class TestMergerRepo(ZuulTestCase): new_result = work_repo_underlying.commit('testtag') self.assertEqual(new_commit, new_result) + def test_set_remote_url_clone(self): + """Test that we always use the new Git URL for cloning. + + This is a regression test to make sure we always use the new + Git URL when a clone of the repo is necessary before updating + the config. + """ + parent_path = os.path.join(self.upstream_root, 'org/project1') + work_repo = Repo(parent_path, self.workspace_root, + 'none@example.org', 'User Name', '0', '0') + + # Simulate an invalid/outdated remote URL with the repo no + # longer existing on the file system. + work_repo.remote_url = "file:///dev/null" + shutil.rmtree(work_repo.local_path) + + # Setting a valid remote URL should update the attribute and + # clone the repository. + work_repo.setRemoteUrl(parent_path) + self.assertEqual(work_repo.remote_url, parent_path) + self.assertTrue(os.path.exists(work_repo.local_path)) + + def test_set_remote_url_invalid(self): + """Test that we don't store the Git URL when failing to set it. + + This is a regression test to make sure we will always update + the Git URL after a previously failed attempt. + """ + parent_path = os.path.join(self.upstream_root, 'org/project1') + work_repo = Repo(parent_path, self.workspace_root, + 'none@example.org', 'User Name', '0', '0') + + # Set the Git remote URL to an invalid value. + invalid_url = "file:///dev/null" + repo = work_repo.createRepoObject(None) + work_repo._git_set_remote_url(repo, invalid_url) + work_repo.remote_url = invalid_url + + # Simulate a failed attempt to update the remote URL + with mock.patch.object(work_repo, "_git_set_remote_url", + side_effect=RuntimeError): + with testtools.ExpectedException(RuntimeError): + work_repo.setRemoteUrl(parent_path) + + # Make sure we cleared out the remote URL. + self.assertIsNone(work_repo.remote_url) + + # Setting a valid remote URL should update the attribute and + # clone the repository. + work_repo.setRemoteUrl(parent_path) + self.assertEqual(work_repo.remote_url, parent_path) + self.assertTrue(os.path.exists(work_repo.local_path)) + class TestMergerWithAuthUrl(ZuulTestCase): config_file = 'zuul-github-driver.conf' diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index aed40b94e..98f971948 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -32,7 +32,9 @@ import zuul.lib.connections from tests.base import BaseTestCase, FIXTURE_DIR from zuul.lib.ansible import AnsibleManager from zuul.lib import tracing +from zuul.model_api import MODEL_API from zuul.zk.zkobject import LocalZKContext +from zuul.zk.components import COMPONENT_REGISTRY from zuul import change_matcher @@ -44,6 +46,8 @@ class Dummy(object): class TestJob(BaseTestCase): def setUp(self): + COMPONENT_REGISTRY.registry = Dummy() + COMPONENT_REGISTRY.registry.model_api = MODEL_API self._env_fixture = self.useFixture( fixtures.EnvironmentVariable('HISTTIMEFORMAT', '%Y-%m-%dT%T%z ')) super(TestJob, self).setUp() diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 8e4cdc20c..c6cdee7ea 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -17,6 +17,7 @@ import json from zuul.zk.components import ComponentRegistry from tests.base import ZuulTestCase, simple_layout, iterate_timeout +from tests.base import ZuulWebFixture def model_version(version): @@ -253,6 +254,72 @@ class TestModelUpgrade(ZuulTestCase): result='SUCCESS', changes='1,1'), ], ordered=False) + @model_version(11) + def test_model_11_12(self): + # This excercises the upgrade to store build/job versions + first = self.scheds.first + second = self.createScheduler() + second.start() + self.assertEqual(len(self.scheds), 2) + for _ in iterate_timeout(10, "until priming is complete"): + state_one = first.sched.local_layout_state.get("tenant-one") + if state_one: + break + + for _ in iterate_timeout( + 10, "all schedulers to have the same layout state"): + if (second.sched.local_layout_state.get( + "tenant-one") == state_one): + break + + self.executor_server.hold_jobs_in_build = True + with second.sched.layout_update_lock, second.sched.run_handler_lock: + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled(matcher=[first]) + + self.model_test_component_info.model_api = 12 + with first.sched.layout_update_lock, first.sched.run_handler_lock: + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled(matcher=[second]) + + self.waitUntilSettled() + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + + @model_version(12) + def test_model_12_13(self): + # Initially queue items will still have the full trigger event + # stored in Zookeeper. The trigger event will be converted to + # an event info object after the model API update. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + + # Upgrade our component + self.model_test_component_info.model_api = 13 + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + class TestGithubModelUpgrade(ZuulTestCase): config_file = 'zuul-github-driver.conf' @@ -305,6 +372,85 @@ class TestGithubModelUpgrade(ZuulTestCase): ], ordered=False) self.assertTrue(A.is_merged) + @model_version(10) + @simple_layout('layouts/github-merge-mode.yaml', driver='github') + def test_merge_method_syntax_check(self): + """ + Tests that the merge mode gets forwarded to the reporter and the + PR was rebased. + """ + webfixture = self.useFixture( + ZuulWebFixture(self.changes, self.config, + self.additional_event_queues, self.upstream_root, + self.poller_events, + self.git_url_with_auth, self.addCleanup, + self.test_root)) + sched = self.scheds.first.sched + web = webfixture.web + + github = self.fake_github.getGithubClient() + repo = github.repo_from_project('org/project') + repo._repodata['allow_rebase_merge'] = False + self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() + + # Verify that there are no errors with model version 9 (we + # should be using the defaultdict that indicates all merge + # modes are supported). + tenant = sched.abide.tenants.get('tenant-one') + self.assertEquals(len(tenant.layout.loading_errors), 0) + + # Upgrade our component + self.model_test_component_info.model_api = 11 + + # Perform a smart reconfiguration which should not clear the + # cache; we should continue to see no errors because we should + # still be using the defaultdict. + self.scheds.first.smartReconfigure() + tenant = sched.abide.tenants.get('tenant-one') + self.assertEquals(len(tenant.layout.loading_errors), 0) + + # Wait for web to have the same config + for _ in iterate_timeout(10, "config is synced"): + if (web.tenant_layout_state.get('tenant-one') == + web.local_layout_state.get('tenant-one')): + break + + # Repeat the check + tenant = web.abide.tenants.get('tenant-one') + self.assertEquals(len(tenant.layout.loading_errors), 0) + + # Perform a full reconfiguration which should cause us to + # actually query, update the branch cache, and report an + # error. + self.scheds.first.fullReconfigure() + self.waitUntilSettled() + + tenant = sched.abide.tenants.get('tenant-one') + loading_errors = tenant.layout.loading_errors + self.assertEquals( + len(tenant.layout.loading_errors), 1, + "An error should have been stored in sched") + self.assertIn( + "rebase not supported", + str(loading_errors[0].error)) + + # Wait for web to have the same config + for _ in iterate_timeout(10, "config is synced"): + if (web.tenant_layout_state.get('tenant-one') == + web.local_layout_state.get('tenant-one')): + break + + # Repoat the check for web + tenant = web.abide.tenants.get('tenant-one') + loading_errors = tenant.layout.loading_errors + self.assertEquals( + len(tenant.layout.loading_errors), 1, + "An error should have been stored in web") + self.assertIn( + "rebase not supported", + str(loading_errors[0].error)) + class TestDeduplication(ZuulTestCase): config_file = "zuul-gerrit-github.conf" diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py index 2cf93cdcb..0c5c5fbc9 100644 --- a/tests/unit/test_reporting.py +++ b/tests/unit/test_reporting.py @@ -151,7 +151,7 @@ class TestReporting(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() for x in buildsets: @@ -180,7 +180,7 @@ class TestReporting(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() for x in buildsets: diff --git a/tests/unit/test_requirements.py b/tests/unit/test_requirements.py index 9a32e4b21..9f3b87187 100644 --- a/tests/unit/test_requirements.py +++ b/tests/unit/test_requirements.py @@ -453,3 +453,40 @@ class TestRequirementsReject(ZuulTestCase): self.fake_gerrit.addEvent(comment) self.waitUntilSettled() self.assertEqual(len(self.history), 3) + + +class TestRequirementsTrustedCheck(ZuulTestCase): + config_file = "zuul-gerrit-github.conf" + tenant_config_file = "config/requirements/trusted-check/main.yaml" + + def test_non_live_requirements(self): + # Test that pipeline requirements are applied to non-live + # changes. + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.setDependsOn(A, 1) + B.addApproval('Code-Review', 2) + + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertHistory([]) + + self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2)) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertHistory([ + dict(name='check-job', result='SUCCESS', changes='1,1 2,1')], + ordered=False) + + def test_other_connections(self): + # Test allow-other-connections: False + A = self.fake_github.openFakePullRequest("gh/project", "master", "A") + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + B.subject, A.url, + ) + B.addApproval('Code-Review', 2) + + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertHistory([]) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 6a0da1279..172ed34dc 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -52,8 +52,9 @@ from tests.base import ( skipIfMultiScheduler, ) from zuul.zk.change_cache import ChangeKey +from zuul.zk.event_queues import PIPELINE_NAME_ROOT from zuul.zk.layout import LayoutState -from zuul.zk.locks import management_queue_lock +from zuul.zk.locks import management_queue_lock, pipeline_lock from zuul.zk import zkobject EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) @@ -460,6 +461,7 @@ class TestScheduler(ZuulTestCase): 'zuul.mergers.online', value='1', kind='g') self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit', value='0', kind='g') + self.assertReportedStat('zuul.scheduler.run_handler', kind='ms') # Catch time / monotonic errors for key in [ @@ -489,9 +491,10 @@ class TestScheduler(ZuulTestCase): 'zuul.tenant.tenant-one.pipeline.gate.write_objects', 'zuul.tenant.tenant-one.pipeline.gate.read_znodes', 'zuul.tenant.tenant-one.pipeline.gate.write_znodes', - 'zuul.tenant.tenant-one.pipeline.gate.read_bytes', 'zuul.tenant.tenant-one.pipeline.gate.write_bytes', ]: + # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is + # expected to be zero since it's initialized after reading val = self.assertReportedStat(key, kind='g') self.assertTrue(0.0 < float(val) < 60000.0) @@ -2807,6 +2810,7 @@ class TestScheduler(ZuulTestCase): B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( B.subject, A.data['url']) + A.addApproval('Code-Review', 2) B.addApproval('Code-Review', 2) self.executor_server.hold_jobs_in_build = True @@ -3585,8 +3589,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3608,8 +3615,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3631,8 +3641,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3653,8 +3666,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3676,8 +3692,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3713,6 +3732,22 @@ class TestScheduler(ZuulTestCase): self.assertEqual(self.history[4].pipeline, 'check') self.assertEqual(self.history[5].pipeline, 'check') + @simple_layout('layouts/two-check.yaml') + def test_query_dependency_count(self): + # Test that we efficiently query dependent changes + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + B.subject, A.data['url']) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + # 1. The query to find the change id + # from the Depends-On string "change:1" (simpleQuery) + # 2. The query to populate the change once we know the id + # (queryChange) + self.assertEqual(A.queried, 2) + self.assertEqual(B.queried, 1) + def test_reconfigure_merge(self): """Test that two reconfigure events are merged""" # Wrap the recofiguration handler so we can count how many @@ -3925,6 +3960,10 @@ class TestScheduler(ZuulTestCase): else: time.sleep(0) + self.assertGreater(new.last_reconfigured, old.last_reconfigured) + self.assertGreater(new.last_reconfigure_event_ltime, + old.last_reconfigure_event_ltime) + def test_tenant_reconfiguration_command_socket(self): "Test that single-tenant reconfiguration via command socket works" @@ -6182,7 +6221,8 @@ For CI problems and help debugging, contact ci@example.org""" build = self.getBuildByName('check-job') inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml') - inventory = yaml.safe_load(open(inv_path, 'r')) + with open(inv_path, 'r') as f: + inventory = yaml.safe_load(f) label = inventory['all']['hosts']['controller']['nodepool']['label'] self.assertEqual('slow-label', label) @@ -6491,6 +6531,33 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(A.data['status'], 'MERGED') self.assertEqual(B.data['status'], 'MERGED') + def test_leaked_pipeline_cleanup(self): + self.waitUntilSettled() + sched = self.scheds.first.sched + + pipeline_state_path = "/zuul/tenant/tenant-one/pipeline/invalid" + self.zk_client.client.ensure_path(pipeline_state_path) + + # Create the ZK path as a side-effect of getting the event queue. + sched.pipeline_management_events["tenant-one"]["invalid"] + pipeline_event_queue_path = PIPELINE_NAME_ROOT.format( + tenant="tenant-one", pipeline="invalid") + + self.assertIsNotNone(self.zk_client.client.exists(pipeline_state_path)) + # Wait for the event watcher to create the event queues + for _ in iterate_timeout(30, "create event queues"): + for event_queue in ("management", "trigger", "result"): + if self.zk_client.client.exists( + f"{pipeline_event_queue_path}/{event_queue}") is None: + break + else: + break + + sched._runLeakedPipelineCleanup() + self.assertIsNone( + self.zk_client.client.exists(pipeline_event_queue_path)) + self.assertIsNone(self.zk_client.client.exists(pipeline_state_path)) + class TestChangeQueues(ZuulTestCase): tenant_config_file = 'config/change-queues/main.yaml' diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py index 37a47c6ae..4f2110f3e 100644 --- a/tests/unit/test_sos.py +++ b/tests/unit/test_sos.py @@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase): self.assertTrue(all(l == new.uuid for l in layout_uuids)) self.waitUntilSettled() + def test_live_reconfiguration_del_pipeline(self): + # Test pipeline deletion while changes are enqueued + + # Create a second scheduler instance + app = self.createScheduler() + app.start() + self.assertEqual(len(self.scheds), 2) + + for _ in iterate_timeout(10, "Wait until priming is complete"): + old = self.scheds.first.sched.tenant_layout_state.get("tenant-one") + if old is not None: + break + + for _ in iterate_timeout( + 10, "Wait for all schedulers to have the same layout state"): + layout_states = [a.sched.local_layout_state.get("tenant-one") + for a in self.scheds.instances] + if all(l == old for l in layout_states): + break + + pipeline_zk_path = app.sched.abide.tenants[ + "tenant-one"].layout.pipelines["check"].state.getPath() + + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + + # Let the first scheduler enqueue the change into the pipeline that + # will be removed later on. + with app.sched.run_handler_lock: + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled(matcher=[self.scheds.first]) + + # Process item only on second scheduler so the first scheduler has + # an outdated pipeline state. + with self.scheds.first.sched.run_handler_lock: + self.executor_server.release('.*-merge') + self.waitUntilSettled(matcher=[app]) + self.assertEqual(len(self.builds), 2) + + self.commitConfigUpdate( + 'common-config', + 'layouts/live-reconfiguration-del-pipeline.yaml') + # Trigger a reconfiguration on the first scheduler with the outdated + # pipeline state of the pipeline that will be removed. + self.scheds.execute(lambda a: a.sched.reconfigure(a.config), + matcher=[self.scheds.first]) + + new = self.scheds.first.sched.tenant_layout_state.get("tenant-one") + for _ in iterate_timeout( + 10, "Wait for all schedulers to have the same layout state"): + layout_states = [a.sched.local_layout_state.get("tenant-one") + for a in self.scheds.instances] + if all(l == new for l in layout_states): + break + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(A.reported, 0) + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='ABORTED', changes='1,1'), + dict(name='project-test2', result='ABORTED', changes='1,1'), + ], ordered=False) + + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + self.assertEqual(len(tenant.layout.pipelines), 0) + stat = self.zk_client.client.exists(pipeline_zk_path) + self.assertIsNone(stat) + def test_change_cache(self): # Test re-using a change from the change cache. A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index ba3117f59..7e6a2e635 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -138,6 +138,17 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase): s.close() self.streamer.stop() + def _readSocket(self, sock, build_uuid, event, name): + msg = "%s\r\n" % build_uuid + sock.sendall(msg.encode('utf-8')) + event.set() # notify we are connected and req sent + while True: + data = sock.recv(1024) + if not data: + break + self.streaming_data[name] += data.decode('utf-8') + sock.shutdown(socket.SHUT_RDWR) + def runFingerClient(self, build_uuid, gateway_address, event, name=None): # Wait until the gateway is started for x in iterate_timeout(30, "finger client to start"): @@ -154,7 +165,7 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase): self.streaming_data[name] = '' with socket.create_connection(gateway_address) as s: if self.fingergw_use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = False context.load_cert_chain( @@ -162,17 +173,10 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase): os.path.join(FIXTURE_DIR, 'fingergw/fingergw.key')) context.load_verify_locations( os.path.join(FIXTURE_DIR, 'fingergw/root-ca.pem')) - s = context.wrap_socket(s) - - msg = "%s\r\n" % build_uuid - s.sendall(msg.encode('utf-8')) - event.set() # notify we are connected and req sent - while True: - data = s.recv(1024) - if not data: - break - self.streaming_data[name] += data.decode('utf-8') - s.shutdown(socket.SHUT_RDWR) + with context.wrap_socket(s) as s: + self._readSocket(s, build_uuid, event, name) + else: + self._readSocket(s, build_uuid, event, name) def runFingerGateway(self, zone=None): self.log.info('Starting fingergw with zone %s', zone) diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index 81d95927a..004ede862 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -1557,6 +1557,43 @@ class TestInRepoConfig(ZuulTestCase): 'start_line': 5}, }) + def test_dynamic_config_job_anchors(self): + # Test the use of anchors in job configuration. This is a + # regression test designed to catch a failure where we freeze + # the first job and in doing so, mutate the vars dict. The + # intended behavior is that the two jobs end up with two + # separate python objects for their vars dicts. + in_repo_conf = textwrap.dedent( + """ + - job: + name: myvars + vars: &anchor + plugins: + foo: bar + + - job: + name: project-test1 + timeout: 999999999999 + vars: *anchor + + - project: + name: org/project + check: + jobs: + - project-test1 + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertEqual(A.reported, 1, + "A should report failure") + self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1") + self.assertIn('max-job-timeout', A.messages[0]) + self.assertHistory([]) + def test_dynamic_config_non_existing_job_in_template(self): """Test that requesting a non existent job fails""" in_repo_conf = textwrap.dedent( @@ -5283,7 +5320,8 @@ class TestRoleBranches(RoleTestCase): def getBuildInventory(self, name): build = self.getBuildByName(name) inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml') - inventory = yaml.safe_load(open(inv_path, 'r')) + with open(inv_path, 'r') as f: + inventory = yaml.safe_load(f) return inventory def getCheckout(self, build, path): diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 7e3c19dfe..b5697ee36 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -18,6 +18,7 @@ import json import queue import threading import uuid +from unittest import mock import testtools @@ -53,10 +54,12 @@ from tests.base import ( BaseTestCase, HoldableExecutorApi, HoldableMergerApi, iterate_timeout ) -from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext +from zuul.zk.zkobject import ( + ShardedZKObject, ZKObject, ZKContext +) from zuul.zk.locks import tenant_write_lock -from kazoo.exceptions import ZookeeperError, OperationTimeoutError +from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError class ZooKeeperBaseTestCase(BaseTestCase): @@ -2037,3 +2040,80 @@ class TestBlobStore(ZooKeeperBaseTestCase): with testtools.ExpectedException(KeyError): bs.get(path) + + +class TestPipelineInit(ZooKeeperBaseTestCase): + # Test the initialize-on-refresh code paths of various pipeline objects + + def test_pipeline_state_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.state.refresh(context) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_state_existing_object(self): + # Test the initialize-on-refresh code path with a pre-existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.manager = mock.Mock() + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + # We refresh the change list here purely for the side effect + # of creating the pipeline state object with no data (the list + # is a subpath of the state object). + pipeline.change_list.refresh(context) + pipeline.state.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_change_list_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.change_list.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + pipeline.manager = mock.Mock() + pipeline.state.refresh(context) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_change_list_new_object_without_lock(self): + # Test the initialize-on-refresh code path if we don't have + # the lock. This should fail. + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + with testtools.ExpectedException(NoNodeError): + pipeline.change_list.refresh(context, allow_init=False) + self.assertIsNone( + self.zk_client.client.exists(pipeline.change_list.getPath())) + pipeline.manager = mock.Mock() + pipeline.state.refresh(context) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) diff --git a/tests/unit/test_zuultrigger.py b/tests/unit/test_zuultrigger.py index 25bfd4c30..f649f4723 100644 --- a/tests/unit/test_zuultrigger.py +++ b/tests/unit/test_zuultrigger.py @@ -35,9 +35,10 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase): A.addApproval('Code-Review', 2) B1.addApproval('Code-Review', 2) B2.addApproval('Code-Review', 2) - A.addApproval('Verified', 1) # required by gate - B1.addApproval('Verified', -1) # should go to check - B2.addApproval('Verified', 1) # should go to gate + A.addApproval('Verified', 1, username="for-check") # reqd by check + A.addApproval('Verified', 1, username="for-gate") # reqd by gate + B1.addApproval('Verified', 1, username="for-check") # go to check + B2.addApproval('Verified', 1, username="for-gate") # go to gate B1.addApproval('Approved', 1) B2.addApproval('Approved', 1) B1.setDependsOn(A, 1) @@ -75,9 +76,9 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase): self.scheds.first.sched, "addTriggerEvent", addTriggerEvent ): C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') - C.addApproval('Verified', -1) + C.addApproval('Verified', 1, username="for-check") D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D') - D.addApproval('Verified', -1) + D.addApproval('Verified', 1, username="for-check") D.setDependsOn(C, 1) self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) @@ -108,6 +109,7 @@ class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase): B1.addReview('derp', 'APPROVED') B2.addReview('derp', 'APPROVED') A.addLabel('for-gate') # required by gate + A.addLabel('for-check') # required by check B1.addLabel('for-check') # should go to check B2.addLabel('for-gate') # should go to gate diff --git a/tools/docker-compose.yaml b/tools/docker-compose.yaml index 83ab9f930..05b4905e2 100644 --- a/tools/docker-compose.yaml +++ b/tools/docker-compose.yaml @@ -3,7 +3,7 @@ version: "3" services: mysql: container_name: zuul-test-mysql - image: mysql:5.7 + image: mysql:8.0 environment: - MYSQL_ROOT_PASSWORD=insecure_worker ports: diff --git a/tools/test-setup-docker.sh b/tools/test-setup-docker.sh index a0fcf9f5a..1601b11a7 100755 --- a/tools/test-setup-docker.sh +++ b/tools/test-setup-docker.sh @@ -58,7 +58,8 @@ timeout 30 bash -c "until ${ROOTCMD} ${MYSQL} -e 'show databases'; do sleep 0.5; echo echo "Setting up permissions for zuul tests" -${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' identified by 'openstack_citest' WITH GRANT OPTION;" +${ROOTCMD} ${MYSQL} -e "CREATE USER 'openstack_citest'@'%' identified by 'openstack_citest';" +${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' WITH GRANT OPTION;" ${ROOTCMD} ${MYSQL} -u openstack_citest -popenstack_citest -e "SET default_storage_engine=MYISAM; DROP DATABASE IF EXISTS openstack_citest; CREATE DATABASE openstack_citest CHARACTER SET utf8;" echo "Finished" diff --git a/tools/yarn-build.sh b/tools/yarn-build.sh new file mode 100755 index 000000000..42eaffc7b --- /dev/null +++ b/tools/yarn-build.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Copyright 2018 Red Hat, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# This script checks if yarn is installed in the current path. If it is not, +# it will use nodeenv to install node, npm and yarn. +# Finally, it will install pip things. +if [[ ! $(command -v yarn) ]] +then + pip install nodeenv + # Initialize nodeenv and tell it to re-use the currently active virtualenv + attempts=0 + set +e + until nodeenv --python-virtualenv -n 16.14.0 ; do + ((attempts++)) + if [[ $attempts > 2 ]] + then + echo "Failed creating nodeenv" + exit 1 + fi + done + set -e + # Use -g because inside of the virtualenv '-g' means 'install into the' + # virtualenv - as opposed to installing into the local node_modules. + # Avoid writing a package-lock.json file since we don't use it. + # Avoid writing yarn into package.json. + npm install -g --no-package-lock --no-save yarn +fi +if [[ ! -f zuul/web/static/index.html ]] +then + mkdir -p zuul/web/static + ln -sfn ../zuul/web/static web/build + pushd web/ + if [[ -n "${YARN_REGISTRY}" ]] + then + echo "Using yarn registry: ${YARN_REGISTRY}" + sed -i "s#https://registry.yarnpkg.com#${YARN_REGISTRY}#" yarn.lock + fi + + # Be forgiving of package retrieval errors + attempts=0 + set +e + until yarn install; do + ((attempts++)) + if [[ $attempts > 2 ]] + then + echo "Failed installing npm packages" + exit 1 + fi + done + set -e + + yarn build + if [[ -n "${YARN_REGISTRY}" ]] + then + echo "Resetting yarn registry" + sed -i "s#${YARN_REGISTRY}#https://registry.yarnpkg.com#" yarn.lock + fi + popd +fi @@ -105,10 +105,3 @@ passenv = ZUUL_ZK_KEY commands = stestr run --test-path ./tests/remote {posargs} - -[flake8] -# These are ignored intentionally in zuul projects; -# please don't submit patches that solely correct them or enable them. -ignore = E124,E125,E129,E252,E402,E741,H,W503,W504 -show-source = True -exclude = .venv,.tox,dist,doc,build,*.egg,node_modules diff --git a/web/public/openapi.yaml b/web/public/openapi.yaml index d69111cf8..cb2e10b37 100644 --- a/web/public/openapi.yaml +++ b/web/public/openapi.yaml @@ -249,7 +249,7 @@ paths: - tenant /api/tenant/{tenant}/key/{project}.pub: get: - operationId: get-project-key + operationId: get-project-secrets-key parameters: - description: The tenant name in: path @@ -275,12 +275,44 @@ paths: ' schema: - description: The project public key + description: The project secrets public key in PKCS8 format type: string - description: Returns the project public key + description: Returns the project public key that is used to encrypt secrets '404': description: Tenant or Project not found - summary: Get a project public key + summary: Get a project public key that is used to encrypt secrets + tags: + - tenant + /api/tenant/{tenant}/project-ssh-key/{project}.pub: + get: + operationId: get-project-ssh-key + parameters: + - description: The tenant name + in: path + name: tenant + required: true + schema: + type: string + - description: The project name + in: path + name: project + required: true + schema: + type: string + responses: + '200': + content: + text/plain: + example: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACA + + ' + schema: + description: The project ssh public key in SSH2 format + type: string + description: Returns the project public key that executor adds to SSH agent + '404': + description: Tenant or Project not found + summary: Get a project public key that is used for SSH in post-merge pipelines tags: - tenant /api/tenant/{tenant}/semaphores: diff --git a/web/src/App.jsx b/web/src/App.jsx index 6a5c6e010..9a0caf551 100644 --- a/web/src/App.jsx +++ b/web/src/App.jsx @@ -32,6 +32,8 @@ import { ButtonVariant, Dropdown, DropdownItem, + DropdownToggle, + DropdownSeparator, KebabToggle, Modal, Nav, @@ -54,6 +56,7 @@ import { import { BellIcon, BookIcon, + ChevronDownIcon, CodeIcon, ServiceIcon, UsersIcon, @@ -67,6 +70,7 @@ import ConfigModal from './containers/config/Config' import logo from './images/logo.svg' import { clearNotification } from './actions/notifications' import { fetchConfigErrorsAction, clearConfigErrorsAction } from './actions/configErrors' +import { fetchTenantsIfNeeded } from './actions/tenants' import { routes } from './routes' import { setTenantAction } from './actions/tenant' import { configureAuthFromTenant, configureAuthFromInfo } from './actions/auth' @@ -81,6 +85,7 @@ class App extends React.Component { configErrorsReady: PropTypes.bool, info: PropTypes.object, tenant: PropTypes.object, + tenants: PropTypes.object, timezone: PropTypes.string, location: PropTypes.object, history: PropTypes.object, @@ -93,6 +98,7 @@ class App extends React.Component { state = { showErrors: false, + isTenantDropdownOpen: false, } renderMenu() { @@ -199,6 +205,7 @@ class App extends React.Component { } else if (!info.tenant) { // Multi tenant, look for tenant name in url whiteLabel = false + this.props.dispatch(fetchTenantsIfNeeded()) const match = matchPath( this.props.location.pathname, { path: '/t/:tenant' }) @@ -368,6 +375,91 @@ class App extends React.Component { ) } + renderTenantDropdown() { + const { tenant, tenants } = this.props + const { isTenantDropdownOpen } = this.state + + if (tenant.whiteLabel) { + return ( + <PageHeaderToolsItem> + <strong>Tenant</strong> {tenant.name} + </PageHeaderToolsItem> + ) + } else { + const tenantLink = (_tenant) => { + const currentPath = this.props.location.pathname + let suffix + switch (currentPath) { + case '/t/' + tenant.name + '/projects': + suffix = '/projects' + break + case '/t/' + tenant.name + '/jobs': + suffix = '/jobs' + break + case '/t/' + tenant.name + '/labels': + suffix = '/labels' + break + case '/t/' + tenant.name + '/nodes': + suffix = '/nodes' + break + case '/t/' + tenant.name + '/autoholds': + suffix = '/autoholds' + break + case '/t/' + tenant.name + '/builds': + suffix = '/builds' + break + case '/t/' + tenant.name + '/buildsets': + suffix = '/buildsets' + break + case '/t/' + tenant.name + '/status': + default: + // all other paths point to tenant-specific resources that would most likely result in a 404 + suffix = '/status' + break + } + return <Link to={'/t/' + _tenant.name + suffix}>{_tenant.name}</Link> + } + + const options = tenants.tenants.filter( + (_tenant) => (_tenant.name !== tenant.name) + ).map( + (_tenant, idx) => { + return ( + <DropdownItem key={'tenant-dropdown-' + idx} component={tenantLink(_tenant)} /> + ) + }) + options.push( + <DropdownSeparator key="tenant-dropdown-separator" />, + <DropdownItem + key="tenant-dropdown-tenants_page" + component={<Link to={tenant.defaultRoute}>Go to tenants page</Link>} /> + ) + + return (tenants.isFetching ? + <PageHeaderToolsItem> + Loading tenants ... + </PageHeaderToolsItem> : + <> + <PageHeaderToolsItem> + <Dropdown + isOpen={isTenantDropdownOpen} + toggle={ + <DropdownToggle + className={`zuul-menu-dropdown-toggle${isTenantDropdownOpen ? '-expanded' : ''}`} + id="tenant-dropdown-toggle-id" + onToggle={(isOpen) => { this.setState({ isTenantDropdownOpen: isOpen }) }} + toggleIndicator={ChevronDownIcon} + > + <strong>Tenant</strong> {tenant.name} + </DropdownToggle>} + onSelect={() => { this.setState({ isTenantDropdownOpen: !isTenantDropdownOpen }) }} + dropdownItems={options} + /> + </PageHeaderToolsItem> + </>) + } + } + render() { const { isKebabDropdownOpen } = this.state const { notifications, configErrors, tenant, info, auth } = this.props @@ -406,7 +498,7 @@ class App extends React.Component { key="tenant" onClick={event => this.handleTenantLink(event)} > - <UsersIcon /> Tenant + <UsersIcon /> Tenants </DropdownItem> ) } @@ -445,15 +537,7 @@ class App extends React.Component { </Button> </a> </PageHeaderToolsItem> - {tenant.name && ( - <PageHeaderToolsItem> - <Link to={tenant.defaultRoute}> - <Button variant={ButtonVariant.plain}> - <strong>Tenant</strong> {tenant.name} - </Button> - </Link> - </PageHeaderToolsItem> - )} + {tenant.name && (this.renderTenantDropdown())} </PageHeaderToolsGroup> <PageHeaderToolsGroup> {/* this kebab dropdown replaces the icon buttons and is hidden for @@ -521,6 +605,7 @@ export default withRouter(connect( configErrorsReady: state.configErrors.ready, info: state.info, tenant: state.tenant, + tenants: state.tenants, timezone: state.timezone, user: state.user, auth: state.auth, diff --git a/web/src/App.test.jsx b/web/src/App.test.jsx index a1d0234d9..519980c7a 100644 --- a/web/src/App.test.jsx +++ b/web/src/App.test.jsx @@ -135,8 +135,9 @@ it('renders single tenant', async () => { // Link should be white-label scoped const topMenuLinks = application.root.findAllByType(Link) expect(topMenuLinks[0].props.to).toEqual('/status') - expect(topMenuLinks[3].props.to.pathname).toEqual('/status') - expect(topMenuLinks[4].props.to.pathname).toEqual('/projects') + expect(topMenuLinks[1].props.to).toEqual('/openapi') + expect(topMenuLinks[2].props.to.pathname).toEqual('/status') + expect(topMenuLinks[3].props.to.pathname).toEqual('/projects') // Location should be /status expect(location.pathname).toEqual('/status') // Info should tell white label tenant openstack diff --git a/web/src/containers/timezone/SelectTz.jsx b/web/src/containers/timezone/SelectTz.jsx index aaa585336..576645f6c 100644 --- a/web/src/containers/timezone/SelectTz.jsx +++ b/web/src/containers/timezone/SelectTz.jsx @@ -12,9 +12,9 @@ import PropTypes from 'prop-types' import React from 'react' -import Select from 'react-select' +import Select, { components } from 'react-select' import moment from 'moment-timezone' -import { OutlinedClockIcon } from '@patternfly/react-icons' +import { OutlinedClockIcon, ChevronDownIcon } from '@patternfly/react-icons' import { connect } from 'react-redux' import { setTimezoneAction } from '../../actions/timezone' @@ -58,7 +58,7 @@ class SelectTz extends React.Component { } render() { - const textColor = '#d1d1d1' + const textColor = '#fff' const containerStyles= { border: 'solid #2b2b2b', borderWidth: '0 0 0 1px', @@ -83,7 +83,11 @@ class SelectTz extends React.Component { }), dropdownIndicator:(provided) => ({ ...provided, - padding: '3px' + color: '#fff', + padding: '3px', + ':hover': { + color: '#fff' + } }), indicatorSeparator: () => {}, menu: (provided) => ({ @@ -93,12 +97,22 @@ class SelectTz extends React.Component { top: '22px', }) } + + const DropdownIndicator = (props) => { + return ( + <components.DropdownIndicator {...props}> + <ChevronDownIcon /> + </components.DropdownIndicator> + ) + } + return ( <div style={containerStyles}> <OutlinedClockIcon/> <Select className="zuul-select-tz" styles={customStyles} + components={{ DropdownIndicator }} value={this.state.currentValue} onChange={this.handleChange} options={this.state.availableTz} diff --git a/web/src/index.css b/web/src/index.css index 6fec50911..587804cfa 100644 --- a/web/src/index.css +++ b/web/src/index.css @@ -66,6 +66,21 @@ a.refresh { font-weight: bold; } +.zuul-menu-dropdown-toggle:before { + content: none !important; +} + +.zuul-menu-dropdown-toggle:hover { + border-bottom: none; +} + +.zuul-menu-dropdown-toggle-expanded:before { + border-left: none; + border-right: none; + border-top: none; + border-bottom: none; +} + /* Remove ugly outline when a Switch is selected */ .pf-c-switch { --pf-c-switch__input--focus__toggle--OutlineWidth: 0; diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py index 66881336a..2d7c37463 100644 --- a/zuul/ansible/logconfig.py +++ b/zuul/ansible/logconfig.py @@ -140,7 +140,8 @@ def _read_config_file(filename: str): raise ValueError("Unable to read logging config file at %s" % filename) if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'): - return yaml.safe_load(open(filename, 'r')) + with open(filename, 'r') as f: + return yaml.safe_load(f) return filename diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py index 6df796bca..9ce342502 100755 --- a/zuul/cmd/__init__.py +++ b/zuul/cmd/__init__.py @@ -105,8 +105,8 @@ class ZuulApp(object): self.commands = {} def _get_version(self): - from zuul.version import version_info as zuul_version_info - return "Zuul version: %s" % zuul_version_info.release_string() + from zuul.version import release_string + return "Zuul version: %s" % release_string def createParser(self): parser = argparse.ArgumentParser( @@ -226,12 +226,12 @@ class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta): def setup_logging(self, section, parameter): super(ZuulDaemonApp, self).setup_logging(section, parameter) - from zuul.version import version_info as zuul_version_info + from zuul.version import release_string log = logging.getLogger( "zuul.{section}".format(section=section.title())) log.debug( "Configured logging: {version}".format( - version=zuul_version_info.release_string())) + version=release_string)) def main(self): self.parseArguments() diff --git a/zuul/configloader.py b/zuul/configloader.py index 67d4494c4..fe22fe0f8 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -437,6 +437,30 @@ def ansible_vars_dict(value): ansible_var_name(key) +def copy_safe_config(conf): + """Return a deep copy of a config dictionary. + + This lets us assign values of a config dictionary to configuration + objects, even if those values are nested dictionaries. This way + we can safely freeze the configuration object (the process of + which mutates dictionaries) without mutating the original + configuration. + + Meanwhile, this does retain the original context information as a + single object (some behaviors rely on mutating the source context + (e.g., pragma)). + + """ + ret = copy.deepcopy(conf) + for key in ( + '_source_context', + '_start_mark', + ): + if key in conf: + ret[key] = conf[key] + return ret + + class PragmaParser(object): pragma = { 'implied-branch-matchers': bool, @@ -452,6 +476,7 @@ class PragmaParser(object): self.pcontext = pcontext def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) bm = conf.get('implied-branch-matchers') @@ -512,6 +537,7 @@ class NodeSetParser(object): return vs.Schema(nodeset) def fromYaml(self, conf, anonymous=False): + conf = copy_safe_config(conf) if anonymous: self.anon_schema(conf) self.anonymous = True @@ -599,6 +625,7 @@ class SecretParser(object): return vs.Schema(secret) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) s = model.Secret(conf['name'], conf['_source_context']) s.source_context = conf['_source_context'] @@ -723,6 +750,7 @@ class JobParser(object): def fromYaml(self, conf, project_pipeline=False, name=None, validate=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) @@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object): return vs.Schema(project) def fromYaml(self, conf, validate=True, freeze=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) source_context = conf['_source_context'] @@ -1165,6 +1194,7 @@ class ProjectParser(object): return vs.Schema(project) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) project_name = conf.get('name') @@ -1292,6 +1322,7 @@ class PipelineParser(object): pipeline = {vs.Required('name'): str, vs.Required('manager'): manager, + 'allow-other-connections': bool, 'precedence': precedence, 'supercedes': to_list(str), 'description': str, @@ -1327,10 +1358,13 @@ class PipelineParser(object): return vs.Schema(pipeline) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) pipeline = model.Pipeline(conf['name'], self.pcontext.tenant) pipeline.source_context = conf['_source_context'] pipeline.start_mark = conf['_start_mark'] + pipeline.allow_other_connections = conf.get( + 'allow-other-connections', True) pipeline.description = conf.get('description') pipeline.supercedes = as_list(conf.get('supercedes', [])) @@ -1366,6 +1400,7 @@ class PipelineParser(object): # Make a copy to manipulate for backwards compat. conf_copy = conf.copy() + seen_connections = set() for conf_key, action in self.reporter_actions.items(): reporter_set = [] allowed_reporters = self.pcontext.tenant.allowed_reporters @@ -1379,6 +1414,7 @@ class PipelineParser(object): reporter_name, pipeline, params) reporter.setAction(conf_key) reporter_set.append(reporter) + seen_connections.add(reporter_name) setattr(pipeline, action, reporter_set) # If merge-conflict actions aren't explicit, use the failure actions @@ -1423,11 +1459,13 @@ class PipelineParser(object): source = self.pcontext.connections.getSource(source_name) manager.ref_filters.extend( source.getRequireFilters(require_config)) + seen_connections.add(source_name) for source_name, reject_config in conf.get('reject', {}).items(): source = self.pcontext.connections.getSource(source_name) manager.ref_filters.extend( source.getRejectFilters(reject_config)) + seen_connections.add(source_name) for connection_name, trigger_config in conf.get('trigger').items(): if self.pcontext.tenant.allowed_triggers is not None and \ @@ -1439,7 +1477,9 @@ class PipelineParser(object): manager.event_filters.extend( trigger.getEventFilters(connection_name, conf['trigger'][connection_name])) + seen_connections.add(connection_name) + pipeline.connections = list(seen_connections) # Pipelines don't get frozen return pipeline @@ -1460,6 +1500,7 @@ class SemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1)) semaphore.source_context = conf.get('_source_context') @@ -1485,6 +1526,7 @@ class QueueParser: return vs.Schema(queue) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) queue = model.Queue( conf['name'], @@ -1514,6 +1556,7 @@ class AuthorizationRuleParser(object): return vs.Schema(authRule) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) a = model.AuthZRuleTree(conf['name']) @@ -1547,6 +1590,7 @@ class GlobalSemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1), global_scope=True) @@ -1567,6 +1611,7 @@ class ApiRootParser(object): return vs.Schema(api_root) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) api_root = model.ApiRoot(conf.get('authentication-realm')) api_root.access_rules = conf.get('access-rules', []) @@ -1688,7 +1733,7 @@ class TenantParser(object): 'disallowed-labels': to_list(str), 'allow-circular-dependencies': bool, 'default-parent': str, - 'default-ansible-version': vs.Any(str, float), + 'default-ansible-version': vs.Any(str, float, int), 'access-rules': to_list(str), 'admin-rules': to_list(str), 'semaphores': to_list(str), @@ -1761,8 +1806,10 @@ class TenantParser(object): for branch_future in as_completed(branch_futures.keys()): tpc = branch_futures[branch_future] - source_context = model.ProjectContext( - tpc.project.canonical_name, tpc.project.name) + trusted, _ = tenant.getProject(tpc.project.canonical_name) + source_context = model.SourceContext( + tpc.project.canonical_name, tpc.project.name, + tpc.project.connection_name, None, None, trusted) with project_configuration_exceptions(source_context, loading_errors): self._getProjectBranches(tenant, tpc, branch_cache_min_ltimes) @@ -1771,8 +1818,8 @@ class TenantParser(object): # Set default ansible version default_ansible_version = conf.get('default-ansible-version') if default_ansible_version is not None: - # The ansible version can be interpreted as float by yaml so make - # sure it's a string. + # The ansible version can be interpreted as float or int + # by yaml so make sure it's a string. default_ansible_version = str(default_ansible_version) ansible_manager.requestVersion(default_ansible_version) else: @@ -1858,6 +1905,9 @@ class TenantParser(object): tpc.branches = static_branches tpc.dynamic_branches = always_dynamic_branches + tpc.merge_modes = tpc.project.source.getProjectMergeModes( + tpc.project, tenant, min_ltime) + def _loadProjectKeys(self, connection_name, project): project.private_secrets_key, project.public_secrets_key = ( self.keystorage.getProjectSecretsKeys( @@ -2209,6 +2259,12 @@ class TenantParser(object): job.source_context.branch) with self.unparsed_config_cache.writeLock( job.source_context.project_canonical_name): + # Prevent files cache ltime from going backward + if files_cache.ltime >= job.ltime: + self.log.info( + "Discarding job %s result since the files cache was " + "updated in the meantime", job) + continue # Since the cat job returns all required config files # for ALL tenants the project is a part of, we can # clear the whole cache and then populate it with the @@ -2577,11 +2633,23 @@ class TenantParser(object): # Set a merge mode if we don't have one for this project. # This can happen if there are only regex project stanzas # but no specific project stanzas. + (trusted, project) = tenant.getProject(project_name) project_metadata = layout.getProjectMetadata(project_name) if project_metadata.merge_mode is None: - (trusted, project) = tenant.getProject(project_name) mode = project.source.getProjectDefaultMergeMode(project) project_metadata.merge_mode = model.MERGER_MAP[mode] + tpc = tenant.project_configs[project.canonical_name] + if tpc.merge_modes is not None: + source_context = model.SourceContext( + project.canonical_name, project.name, + project.connection_name, None, None, trusted) + with project_configuration_exceptions(source_context, + layout.loading_errors): + if project_metadata.merge_mode not in tpc.merge_modes: + mode = model.get_merge_mode_name( + project_metadata.merge_mode) + raise Exception(f'Merge mode {mode} not supported ' + f'by project {project_name}') def _parseLayout(self, tenant, data, loading_errors, layout_uuid=None): # Don't call this method from dynamic reconfiguration because diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py index 53be27d38..3d4ceec55 100644 --- a/zuul/connection/__init__.py +++ b/zuul/connection/__init__.py @@ -15,10 +15,8 @@ import abc import logging -from typing import List, Optional - from zuul.lib.logutil import get_annotated_logger -from zuul.model import Project +from zuul import model class ReadOnlyBranchCacheError(RuntimeError): @@ -143,8 +141,8 @@ class ZKBranchCacheMixin: read_only = False @abc.abstractmethod - def isBranchProtected(self, project_name: str, branch_name: str, - zuul_event_id) -> Optional[bool]: + def isBranchProtected(self, project_name, branch_name, + zuul_event_id): """Return if the branch is protected or None if the branch is unknown. :param str project_name: @@ -157,9 +155,35 @@ class ZKBranchCacheMixin: pass @abc.abstractmethod - def _fetchProjectBranches(self, project: Project, - exclude_unprotected: bool) -> List[str]: - pass + def _fetchProjectBranches(self, project, exclude_unprotected): + """Perform a remote query to determine the project's branches. + + Connection subclasses should implement this method. + + :param model.Project project: + The project. + :param bool exclude_unprotected: + Whether the query should exclude unprotected branches from + the response. + + :returns: A list of branch names. + """ + + def _fetchProjectMergeModes(self, project): + """Perform a remote query to determine the project's supported merge + modes. + + Connection subclasses should implement this method if they are + able to determine which merge modes apply for a project. The + default implemantion returns that all merge modes are valid. + + :param model.Project project: + The project. + + :returns: A list of merge modes as model IDs. + + """ + return model.ALL_MERGE_MODES def clearConnectionCacheOnBranchEvent(self, event): """Update event and clear connection cache if needed. @@ -214,8 +238,7 @@ class ZKBranchCacheMixin: # Update them if we have them if protected_branches is not None: - protected_branches = self._fetchProjectBranches( - project, True) + protected_branches = self._fetchProjectBranches(project, True) self._branch_cache.setProjectBranches( project.name, True, protected_branches) @@ -223,6 +246,10 @@ class ZKBranchCacheMixin: all_branches = self._fetchProjectBranches(project, False) self._branch_cache.setProjectBranches( project.name, False, all_branches) + + merge_modes = self._fetchProjectMergeModes(project) + self._branch_cache.setProjectMergeModes( + project.name, merge_modes) self.log.info("Got branches for %s" % project.name) def getProjectBranches(self, project, tenant, min_ltime=-1): @@ -282,6 +309,62 @@ class ZKBranchCacheMixin: return sorted(branches) + def getProjectMergeModes(self, project, tenant, min_ltime=-1): + """Get the merge modes for the given project. + + :param zuul.model.Project project: + The project for which the merge modes are returned. + :param zuul.model.Tenant tenant: + The related tenant. + :param int min_ltime: + The minimum ltime to determine if we need to refresh the cache. + + :returns: The list of merge modes by model id. + """ + merge_modes = None + + if self._branch_cache: + try: + merge_modes = self._branch_cache.getProjectMergeModes( + project.name, min_ltime) + except LookupError: + if self.read_only: + # A scheduler hasn't attempted to fetch them yet + raise ReadOnlyBranchCacheError( + "Will not fetch merge modes as read-only is set") + else: + merge_modes = None + + if merge_modes is not None: + return merge_modes + elif self.read_only: + # A scheduler has previously attempted a fetch, but got + # the None due to an error; we can't retry since we're + # read-only. + raise RuntimeError( + "Will not fetch merge_modes as read-only is set") + + # We need to perform a query + try: + merge_modes = self._fetchProjectMergeModes(project) + except Exception: + # We weren't able to get the merge modes. We need to tell + # future schedulers to try again but tell zuul-web that we + # tried and failed. Set the merge modes to None to indicate + # that we have performed a fetch and retrieved no data. Any + # time we encounter None in the cache, we will try again. + if self._branch_cache: + self._branch_cache.setProjectMergeModes( + project.name, None) + raise + self.log.info("Got merge modes for %s" % project.name) + + if self._branch_cache: + self._branch_cache.setProjectMergeModes( + project.name, merge_modes) + + return merge_modes + def checkBranchCache(self, project_name: str, event, protected: bool = None) -> None: """Clear the cache for a project when a branch event is processed diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 0a1f0ee61..276365e1d 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): def getInfoRefs(self, project: Project) -> Dict[str, str]: try: - data = self._uploadPack(project) + # Encode the UTF-8 data back to a byte array, as the size of + # each record in the pack is in bytes, and so the slicing must + # also be done on a byte-basis. + data = self._uploadPack(project).encode("utf-8") except Exception: self.log.error("Cannot get references from %s" % project) raise # keeps error information @@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): plen -= 4 if len(data) - i < plen: raise Exception("Invalid data in info/refs") - line = data[i:i + plen] + # Once the pack data is sliced, we can safely decode it back + # into a (UTF-8) string. + line = data[i:i + plen].decode("utf-8") i += plen if not read_advertisement: read_advertisement = True diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index a2a55b050..182c83bae 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -48,6 +48,7 @@ from zuul.driver.github.graphql import GraphQLClient from zuul.lib import tracing from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger +from zuul import model from zuul.model import Ref, Branch, Tag, Project from zuul.exceptions import MergeFailure from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent @@ -64,6 +65,12 @@ GITHUB_BASE_URL = 'https://api.github.com' PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json' PREVIEW_DRAFT_ACCEPT = 'application/vnd.github.shadow-cat-preview+json' PREVIEW_CHECKS_ACCEPT = 'application/vnd.github.antiope-preview+json' +ALL_MERGE_MODES = [ + model.MERGER_MERGE, + model.MERGER_MERGE_RESOLVE, + model.MERGER_SQUASH_MERGE, + model.MERGER_REBASE, +] # NOTE (felix): Using log levels for file comments / annotations is IMHO more # convenient than the values Github expects. Having in mind that those comments @@ -956,8 +963,9 @@ class GithubClientManager: if 'cache-control' in response.headers: del response.headers['cache-control'] + self._cache = DictCache() self.cache_adapter = cachecontrol.CacheControlAdapter( - DictCache(), + self._cache, cache_etags=True, heuristic=NoAgeHeuristic()) @@ -1453,6 +1461,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): log.debug("Change %s is currently being updated, " "waiting for it to finish", change) with lock: + change = self._change_cache.get(change_key) log.debug('Finished updating change %s', change) return change @@ -1776,6 +1785,42 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return branches + def _fetchProjectMergeModes(self, project): + github = self.getGithubClient(project.name) + url = github.session.build_url('repos', project.name) + headers = {'Accept': 'application/vnd.github.loki-preview+json'} + merge_modes = [] + + # GitHub API bug: if the allow_* attributes below are changed, + # the ETag is not updated, meaning that once we cache the repo + # URL, we'll never update it. To avoid this, clear this URL + # from the cache before performing the request. + self._github_client_manager._cache.data.pop(url, None) + + resp = github.session.get(url, headers=headers) + + if resp.status_code == 403: + self.log.error(str(resp)) + rate_limit = github.rate_limit() + if rate_limit['resources']['core']['remaining'] == 0: + self.log.warning( + "Rate limit exceeded, using full merge method list") + return ALL_MERGE_MODES + elif resp.status_code == 404: + raise Exception("Got status code 404 when fetching " + "project %s" % project.name) + + resp = resp.json() + if resp.get('allow_merge_commit'): + merge_modes.append(model.MERGER_MERGE) + merge_modes.append(model.MERGER_MERGE_RESOLVE) + if resp.get('allow_squash_merge'): + merge_modes.append(model.MERGER_SQUASH_MERGE) + if resp.get('allow_rebase_merge'): + merge_modes.append(model.MERGER_REBASE) + + return merge_modes + def isBranchProtected(self, project_name: str, branch_name: str, zuul_event_id=None) -> Optional[bool]: github = self.getGithubClient( diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py index bdc373f79..0a94a1730 100644 --- a/zuul/driver/github/githubsource.py +++ b/zuul/driver/github/githubsource.py @@ -135,6 +135,9 @@ class GithubSource(BaseSource): def getProjectBranches(self, project, tenant, min_ltime=-1): return self.connection.getProjectBranches(project, tenant, min_ltime) + def getProjectMergeModes(self, project, tenant, min_ltime=-1): + return self.connection.getProjectMergeModes(project, tenant, min_ltime) + def getProjectBranchCacheLtime(self): return self.connection._branch_cache.ltime diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 0d56744e4..da423f085 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -633,17 +633,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return change project = self.source.getProject(change_key.project_name) if not change: - if not event: - self.log.error("Change %s not found in cache and no event", - change_key) - if event: - url = event.change_url change = MergeRequest(project.name) change.project = project change.number = number # patch_number is the tips commit SHA of the MR change.patchset = change_key.revision - change.url = url or self.getMRUrl(project.name, number) + change.url = self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme log.debug("Getting change mr#%s from project %s" % ( diff --git a/zuul/driver/mqtt/mqttreporter.py b/zuul/driver/mqtt/mqttreporter.py index 5c95a19ea..8c3905e35 100644 --- a/zuul/driver/mqtt/mqttreporter.py +++ b/zuul/driver/mqtt/mqttreporter.py @@ -79,7 +79,8 @@ class MQTTReporter(BaseReporter): 'result': result, 'dependencies': [j.name for j in job.dependencies], 'artifacts': get_artifacts_from_result_data( - build.result_data, logger=log) + build.result_data, logger=log), + 'events': [e.toDict() for e in build.events], }) if include_returned_data: rdata = build.result_data.copy() diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py index da7b3207f..17b67805e 100644 --- a/zuul/driver/sql/alembic/env.py +++ b/zuul/driver/sql/alembic/env.py @@ -53,7 +53,8 @@ def run_migrations_online(): connectable = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', - poolclass=pool.NullPool) + poolclass=pool.NullPool, + future=True) # we can get the table prefix via the tag object tag = context.get_tag_argument() diff --git a/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py b/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py new file mode 100644 index 000000000..94892038c --- /dev/null +++ b/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py @@ -0,0 +1,49 @@ +# Copyright 2022 BMW Group +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""add_build_event_table + +Revision ID: 0ed5def089e2 +Revises: c7467b642498 +Create Date: 2022-12-12 12:08:20.882790 + +""" + +# revision identifiers, used by Alembic. +revision = '0ed5def089e2' +down_revision = 'c7467b642498' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + +BUILD_EVENT_TABLE = "zuul_build_event" +BUILD_TABLE = "zuul_build" + + +def upgrade(table_prefix=''): + op.create_table( + table_prefix + BUILD_EVENT_TABLE, + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("build_id", sa.Integer, + sa.ForeignKey(table_prefix + BUILD_TABLE + ".id")), + sa.Column("event_time", sa.DateTime), + sa.Column("event_type", sa.String(255)), + sa.Column("description", sa.TEXT()), + ) + + +def downgrade(): + raise Exception("Downgrades not supported") diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py index 67581a6f9..1735d35f3 100644 --- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py +++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py @@ -24,13 +24,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET result=( - SELECT CASE score - WHEN 1 THEN 'SUCCESS' - ELSE 'FAILURE' END) - """.format(buildset_table=table_prefix + BUILDSET_TABLE)) + sa.text( + """ + UPDATE {buildset_table} + SET result=( + SELECT CASE score + WHEN 1 THEN 'SUCCESS' + ELSE 'FAILURE' END) + """.format(buildset_table=table_prefix + BUILDSET_TABLE) + ) + ) op.drop_column(table_prefix + BUILDSET_TABLE, 'score') diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py index abfba7247..99d12d750 100644 --- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py +++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py @@ -34,13 +34,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET updated=greatest( - coalesce(first_build_start_time, '1970-01-01 00:00:00'), - coalesce(last_build_end_time, '1970-01-01 00:00:00'), - coalesce(event_timestamp, '1970-01-01 00:00:00')) - """.format(buildset_table=table_prefix + "zuul_buildset")) + sa.text( + """ + UPDATE {buildset_table} + SET updated=greatest( + coalesce(first_build_start_time, '1970-01-01 00:00:00'), + coalesce(last_build_end_time, '1970-01-01 00:00:00'), + coalesce(event_timestamp, '1970-01-01 00:00:00')) + """.format(buildset_table=table_prefix + "zuul_buildset") + ) + ) def downgrade(): diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index 8ab528c39..2d5c39ec3 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -29,6 +29,7 @@ from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock BUILDSET_TABLE = 'zuul_buildset' BUILD_TABLE = 'zuul_build' +BUILD_EVENTS_TABLE = 'zuul_build_event' ARTIFACT_TABLE = 'zuul_artifact' PROVIDES_TABLE = 'zuul_provides' @@ -307,27 +308,31 @@ class SQLConnection(BaseConnection): def _migrate(self, revision='head'): """Perform the alembic migrations for this connection""" + # Note that this method needs to be called with an external lock held. + # The reason for this is we retrieve the alembic version and run the + # alembic migrations in different database transactions which opens + # us to races without an external lock. with self.engine.begin() as conn: context = alembic.migration.MigrationContext.configure(conn) current_rev = context.get_current_revision() - self.log.debug('Current migration revision: %s' % current_rev) - - config = alembic.config.Config() - config.set_main_option("script_location", - "zuul:driver/sql/alembic") - config.set_main_option("sqlalchemy.url", - self.connection_config.get('dburi'). - replace('%', '%%')) - - # Alembic lets us add arbitrary data in the tag argument. We can - # leverage that to tell the upgrade scripts about the table prefix. - tag = {'table_prefix': self.table_prefix} - - if current_rev is None and not self.force_migrations: - self.metadata.create_all(self.engine) - alembic.command.stamp(config, revision, tag=tag) - else: - alembic.command.upgrade(config, revision, tag=tag) + self.log.debug('Current migration revision: %s' % current_rev) + + config = alembic.config.Config() + config.set_main_option("script_location", + "zuul:driver/sql/alembic") + config.set_main_option("sqlalchemy.url", + self.connection_config.get('dburi'). + replace('%', '%%')) + + # Alembic lets us add arbitrary data in the tag argument. We can + # leverage that to tell the upgrade scripts about the table prefix. + tag = {'table_prefix': self.table_prefix} + + if current_rev is None and not self.force_migrations: + self.metadata.create_all(self.engine) + alembic.command.stamp(config, revision, tag=tag) + else: + alembic.command.upgrade(config, revision, tag=tag) def onLoad(self, zk_client, component_registry=None): safe_connection = quote_plus(self.connection_name) @@ -446,6 +451,15 @@ class SQLConnection(BaseConnection): session.flush() return p + def createBuildEvent(self, *args, **kw): + session = orm.session.Session.object_session(self) + e = BuildEventModel(*args, **kw) + e.build_id = self.id + self.build_events.append(e) + session.add(e) + session.flush() + return e + class ArtifactModel(Base): __tablename__ = self.table_prefix + ARTIFACT_TABLE id = sa.Column(sa.Integer, primary_key=True) @@ -464,6 +478,19 @@ class SQLConnection(BaseConnection): name = sa.Column(sa.String(255)) build = orm.relationship(BuildModel, backref="provides") + class BuildEventModel(Base): + __tablename__ = self.table_prefix + BUILD_EVENTS_TABLE + id = sa.Column(sa.Integer, primary_key=True) + build_id = sa.Column(sa.Integer, sa.ForeignKey( + self.table_prefix + BUILD_TABLE + ".id")) + event_time = sa.Column(sa.DateTime) + event_type = sa.Column(sa.String(255)) + description = sa.Column(sa.TEXT()) + build = orm.relationship(BuildModel, backref="build_events") + + self.buildEventModel = BuildEventModel + self.zuul_build_event_table = self.buildEventModel.__table__ + self.providesModel = ProvidesModel self.zuul_provides_table = self.providesModel.__table__ diff --git a/zuul/driver/sql/sqlreporter.py b/zuul/driver/sql/sqlreporter.py index d16f50fcb..4d4970f35 100644 --- a/zuul/driver/sql/sqlreporter.py +++ b/zuul/driver/sql/sqlreporter.py @@ -163,6 +163,17 @@ class SQLReporter(BaseReporter): artifact['metadata'] = json.dumps( artifact['metadata']) db_build.createArtifact(**artifact) + + for event in build.events: + # Reformat the event_time so it's compatible to SQL. + # Don't update the event object in place, but only + # the generated dict representation to not alter the + # datastructure for other reporters. + ev = event.toDict() + ev["event_time"] = datetime.datetime.fromtimestamp( + event.event_time, tz=datetime.timezone.utc) + db_build.createBuildEvent(**ev) + return db_build except sqlalchemy.exc.DBAPIError: if retry_count < self.retry_count - 1: diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py index 37fdcf580..dcc013788 100644 --- a/zuul/driver/timer/__init__.py +++ b/zuul/driver/timer/__init__.py @@ -226,6 +226,7 @@ class TimerDriver(Driver, TriggerInterface): event.branch = branch event.zuul_event_id = str(uuid4().hex) event.timestamp = time.time() + event.arrived_at_scheduler_timestamp = event.timestamp # Refresh the branch in order to update the item in the # change cache. change_key = project.source.getChangeKey(event) @@ -234,7 +235,9 @@ class TimerDriver(Driver, TriggerInterface): event=event) log = get_annotated_logger(self.log, event) log.debug("Adding event") - self.sched.addTriggerEvent(self.name, event) + self.sched.pipeline_trigger_events[tenant.name][ + pipeline_name + ].put(self.name, event) except Exception: self.log.exception("Error dispatching timer event for " "tenant %s project %s branch %s", diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 9aa38cbca..41e5e6916 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -118,6 +118,11 @@ class ExecutorClient(object): # Store the NodeRequest ID in the job arguments, so we can look it up # on the executor side to lock the nodes. req_id = build.build_set.getJobNodeRequestID(job.name) + if isinstance(req_id, dict): + # This should never happen + raise Exception( + "Attempt to start build with deduplicated node request ID " + f"{req_id}") if req_id: params["noderequest_id"] = req_id diff --git a/zuul/executor/common.py b/zuul/executor/common.py index ff4522d22..b8393903e 100644 --- a/zuul/executor/common.py +++ b/zuul/executor/common.py @@ -65,6 +65,7 @@ def construct_build_params(uuid, connections, job, item, pipeline, zuul_params['patchset'] = str(item.change.patchset) if hasattr(item.change, 'message'): zuul_params['message'] = strings.b64encode(item.change.message) + zuul_params['change_message'] = item.change.message if (hasattr(item.change, 'oldrev') and item.change.oldrev and item.change.oldrev != '0' * 40): zuul_params['oldrev'] = item.change.oldrev diff --git a/zuul/executor/server.py b/zuul/executor/server.py index c3737b5cc..a49bbbbbf 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -14,6 +14,7 @@ # under the License. import collections +import copy import datetime import json import logging @@ -1049,7 +1050,7 @@ class AnsibleJob(object): # The same, but frozen self.frozen_hostvars = {} # The zuul.* vars - self.zuul_vars = {} + self.debug_zuul_vars = {} self.waiting_for_semaphores = False def run(self): @@ -1888,7 +1889,8 @@ class AnsibleJob(object): logfile=json_output)) return try: - output = json.load(open(json_output, 'r')) + with open(json_output, 'r') as f: + output = json.load(f) last_playbook = output[-1] # Transform json to yaml - because it's easier to read and given # the size of the data it'll be extra-hard to read this as an @@ -2332,7 +2334,8 @@ class AnsibleJob(object): def prepareKubeConfig(self, jobdir, data): kube_cfg_path = jobdir.kubeconfig if os.path.exists(kube_cfg_path): - kube_cfg = yaml.safe_load(open(kube_cfg_path)) + with open(kube_cfg_path) as f: + kube_cfg = yaml.safe_load(f) else: kube_cfg = { 'apiVersion': 'v1', @@ -2495,10 +2498,18 @@ class AnsibleJob(object): if ri.role_path is not None], )) + # The zuul vars in the debug inventory.yaml file should not + # have any !unsafe tags, so save those before we update the + # execution version of those. + self.debug_zuul_vars = copy.deepcopy(zuul_vars) + if 'change_message' in zuul_vars: + zuul_vars['change_message'] = yaml.mark_strings_unsafe( + zuul_vars['change_message']) + with open(self.jobdir.zuul_vars, 'w') as zuul_vars_yaml: zuul_vars_yaml.write( - yaml.safe_dump({'zuul': zuul_vars}, default_flow_style=False)) - self.zuul_vars = zuul_vars + yaml.ansible_unsafe_dump({'zuul': zuul_vars}, + default_flow_style=False)) # Squash all and extra vars into localhost (it's not # explicitly listed). @@ -2552,7 +2563,7 @@ class AnsibleJob(object): inventory = make_inventory_dict( self.host_list, self.nodeset, self.original_hostvars) - inventory['all']['vars']['zuul'] = self.zuul_vars + inventory['all']['vars']['zuul'] = self.debug_zuul_vars with open(self.jobdir.inventory, 'w') as inventory_yaml: inventory_yaml.write( yaml.ansible_unsafe_dump( @@ -3481,6 +3492,8 @@ class ExecutorServer(BaseMergeServer): self.statsd.gauge(base_key + '.load_average', 0) self.statsd.gauge(base_key + '.pct_used_ram', 0) self.statsd.gauge(base_key + '.running_builds', 0) + self.statsd.close() + self.statsd = None # Use the BaseMergeServer's stop method to disconnect from # ZooKeeper. We do this as one of the last steps to ensure diff --git a/zuul/lib/encryption.py b/zuul/lib/encryption.py index 79e92e366..fd637b278 100644 --- a/zuul/lib/encryption.py +++ b/zuul/lib/encryption.py @@ -20,18 +20,6 @@ from cryptography.hazmat.primitives import hashes from functools import lru_cache -# OpenSSL 3.0.0 performs key validation in a very slow manner. Since -# our keys are internally generated and securely stored, we can skip -# validation. See https://github.com/pyca/cryptography/issues/7236 -backend = default_backend() -if hasattr(backend, '_rsa_skip_check_key'): - backend._rsa_skip_check_key = True -else: - import logging - logging.warning("Cryptography backend lacks _rsa_skip_check_key flag, " - "key loading may be slow") - - # https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/#generation def generate_rsa_keypair(): """Generate an RSA keypair. @@ -42,7 +30,7 @@ def generate_rsa_keypair(): private_key = rsa.generate_private_key( public_exponent=65537, key_size=4096, - backend=backend, + backend=default_backend(), ) public_key = private_key.public_key() return (private_key, public_key) @@ -110,7 +98,8 @@ def deserialize_rsa_keypair(data, password=None): private_key = serialization.load_pem_private_key( data, password=password, - backend=backend, + backend=default_backend(), + unsafe_skip_rsa_key_validation=True, ) public_key = private_key.public_key() return (private_key, public_key) diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index ad945c1b7..184c9762d 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -47,6 +47,18 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): self.fingergw = kwargs.pop('fingergw') super(RequestHandler, self).__init__(*args, **kwargs) + def _readSocket(self, sock, build_uuid): + # timeout only on the connection, let recv() wait forever + sock.settimeout(None) + msg = "%s\n" % build_uuid # Must have a trailing newline! + sock.sendall(msg.encode('utf-8')) + while True: + data = sock.recv(1024) + if data: + self.request.sendall(data) + else: + break + def _fingerClient(self, server, port, build_uuid, use_ssl): ''' Open a finger connection and return all streaming results. @@ -59,24 +71,16 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): ''' with socket.create_connection((server, port), timeout=10) as s: if use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = self.fingergw.tls_verify_hostnames context.load_cert_chain(self.fingergw.tls_cert, self.fingergw.tls_key) context.load_verify_locations(self.fingergw.tls_ca) - s = context.wrap_socket(s, server_hostname=server) - - # timeout only on the connection, let recv() wait forever - s.settimeout(None) - msg = "%s\n" % build_uuid # Must have a trailing newline! - s.sendall(msg.encode('utf-8')) - while True: - data = s.recv(1024) - if data: - self.request.sendall(data) - else: - break + with context.wrap_socket(s, server_hostname=server) as s: + self._readSocket(s, build_uuid) + else: + self._readSocket(s, build_uuid) def handle(self): ''' diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py index ecefae9ea..63a800406 100644 --- a/zuul/lib/repl.py +++ b/zuul/lib/repl.py @@ -26,14 +26,14 @@ class ThreadLocalProxy(object): self.default = default def __getattr__(self, name): - obj = self.files.get(threading.currentThread(), self.default) + obj = self.files.get(threading.current_thread(), self.default) return getattr(obj, name) def register(self, obj): - self.files[threading.currentThread()] = obj + self.files[threading.current_thread()] = obj def unregister(self): - self.files.pop(threading.currentThread()) + self.files.pop(threading.current_thread()) class REPLHandler(socketserver.StreamRequestHandler): diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 04de4b8cb..a50fb4142 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -168,7 +168,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): if all([self.server_ssl_key, self.server_ssl_cert, self.server_ssl_ca]): - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(self.server_ssl_cert, self.server_ssl_key) context.load_verify_locations(self.server_ssl_ca) context.verify_mode = ssl.CERT_REQUIRED diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 65fa58d5b..e87e553d3 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,6 +28,8 @@ from zuul.model import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY +from zuul.zk.exceptions import LockException +from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout self.buildChangeQueues(layout) - with self.sched.createZKContext(None, self.log) as ctx,\ - self.currentContext(ctx): - # Make sure we have state and change list objects, and - # ensure that they exist in ZK. We don't hold the - # pipeline lock, but if they don't exist, that means they - # are new, so no one else will either, so the write on - # create is okay. If they do exist and we have an old - # object, we'll just reuse it. If it does exist and we - # don't have an old object, we'll get a new empty one. - # Regardless, these will not automatically refresh now, so - # they will be out of date until they are refreshed later. - self.pipeline.state = PipelineState.create( - self.pipeline, layout.uuid, self.pipeline.state) - self.pipeline.change_list = PipelineChangeList.create( - self.pipeline) + # Make sure we have state and change list objects. We + # don't actually ensure they exist in ZK here; these are + # just local objects until they are serialized the first + # time. Since we don't hold the pipeline lock, we can't + # reliably perform any read or write operations; we just + # need to ensure we have in-memory objects to work with + # and they will be initialized or loaded on the next + # refresh. + + # These will be out of date until they are refreshed later. + self.pipeline.state = PipelineState.create( + self.pipeline, self.pipeline.state) + self.pipeline.change_list = PipelineChangeList.create( + self.pipeline) + + # Now, try to acquire a non-blocking pipeline lock and refresh + # them for the side effect of initializing them if necessary. + # In the case of a new pipeline, no one else should have a + # lock anyway, and this helps us avoid emitting a whole bunch + # of errors elsewhere on startup when these objects don't + # exist. If the pipeline already exists and we can't acquire + # the lock, that's fine, we're much less likely to encounter + # read errors elsewhere in that case anyway. + try: + with pipeline_lock( + self.sched.zk_client, self.pipeline.tenant.name, + self.pipeline.name, blocking=False) as lock,\ + self.sched.createZKContext(lock, self.log) as ctx,\ + self.currentContext(ctx): + if not self.pipeline.state.exists(ctx): + # We only do this if the pipeline doesn't exist in + # ZK because in that case, this process should be + # fast since it's empty. If it does exist, + # refreshing it may be slow and since other actors + # won't encounter errors due to its absence, we + # would rather defer the work to later. + self.pipeline.state.refresh(ctx) + self.pipeline.change_list.refresh(ctx) + except LockException: + pass def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") @@ -216,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies(change, None, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -258,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta): return True return False - def isAnyVersionOfChangeInPipeline(self, change): - # Checks any items in the pipeline + def isChangeRelevantToPipeline(self, change): + # Checks if any version of the change or its deps matches any + # item in the pipeline. for change_key in self.pipeline.change_list.getChangeKeys(): if change.cache_stat.key.isSameChange(change_key): return True + if isinstance(change, model.Change): + for dep_change_ref in change.getNeedsChanges( + self.useDependenciesByTopic(change.project)): + dep_change_key = ChangeKey.fromReference(dep_change_ref) + if change.cache_stat.key.isSameChange(dep_change_key): + return True return False def isChangeAlreadyInQueue(self, change, change_queue): @@ -276,19 +310,19 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return - change_in_pipeline = False + to_refresh = set() for item in self.pipeline.getAllItems(): if not isinstance(item.change, model.Change): continue + if item.change.equals(change): + to_refresh.add(item.change) for dep_change_ref in item.change.commit_needs_changes: - if item.change.equals(change): - change_in_pipeline = True dep_change_key = ChangeKey.fromReference(dep_change_ref) if dep_change_key.isSameChange(change.cache_stat.key): - self.updateCommitDependencies(item.change, None, event) + to_refresh.add(item.change) - if change_in_pipeline: - self.updateCommitDependencies(change, None, event) + for existing_change in to_refresh: + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -489,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta): def addChange(self, change, event, quiet=False, enqueue_time=None, ignore_requirements=False, live=True, - change_queue=None, history=None, dependency_graph=None): + change_queue=None, history=None, dependency_graph=None, + skip_presence_check=False): log = get_annotated_logger(self.log, event) log.debug("Considering adding change %s" % change) @@ -504,10 +539,18 @@ class PipelineManager(metaclass=ABCMeta): # If we are adding a live change, check if it's a live item # anywhere in the pipeline. Otherwise, we will perform the # duplicate check below on the specific change_queue. - if live and self.isChangeAlreadyInPipeline(change): + if (live and + self.isChangeAlreadyInPipeline(change) and + not skip_presence_check): log.debug("Change %s is already in pipeline, ignoring" % change) return True + if ((not self.pipeline.allow_other_connections) and + (change.project.connection_name not in self.pipeline.connections)): + log.debug("Change %s is not from a connection known to %s ", + change, self.pipeline) + return False + if not ignore_requirements: for f in self.ref_filters: if f.connection_name != change.project.connection_name: @@ -531,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta): # to date and this is a noop; otherwise, we need to refresh # them anyway. if isinstance(change, model.Change): - self.updateCommitDependencies(change, None, event) + self.updateCommitDependencies(change, event) with self.getChangeQueue(change, event, change_queue) as change_queue: if not change_queue: @@ -557,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta): log.debug("History after enqueuing changes ahead: %s", history) if self.isChangeAlreadyInQueue(change, change_queue): - log.debug("Change %s is already in queue, ignoring" % change) - return True + if not skip_presence_check: + log.debug("Change %s is already in queue, ignoring", + change) + return True cycle = [] if isinstance(change, model.Change): @@ -592,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta): if enqueue_time: item.enqueue_time = enqueue_time item.live = live - self.reportStats(item, added=True) + self.reportStats(item, trigger_event=event) item.quiet = quiet if item.live: @@ -824,40 +869,56 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name].put( event, needs_result=False) - def updateCommitDependencies(self, change, change_queue, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) - # Search for Depends-On headers and find appropriate changes - log.debug(" Updating commit dependencies for %s", change) - dependencies = [] - seen = set() - for match in find_dependency_headers(change.message): - log.debug(" Found Depends-On header: %s", match) - if match in seen: - continue - seen.add(match) - try: - url = urllib.parse.urlparse(match) - except ValueError: - continue - source = self.sched.connections.getSourceByHostname( - url.hostname) - if not source: - continue - log.debug(" Found source: %s", source) - dep = source.getChangeByURLWithRetry(match, event) - if dep and (not dep.is_merged) and dep not in dependencies: - log.debug(" Adding dependency: %s", dep) - dependencies.append(dep) - new_commit_needs_changes = [d.cache_key for d in dependencies] + must_update_commit_deps = ( + not hasattr(event, "zuul_event_ltime") + or change.commit_needs_changes is None + or change.cache_stat.mzxid <= event.zuul_event_ltime + ) - update_attrs = dict(commit_needs_changes=new_commit_needs_changes) + must_update_topic_deps = ( + self.useDependenciesByTopic(change.project) and ( + not hasattr(event, "zuul_event_ltime") + or change.topic_needs_changes is None + or change.cache_stat.mzxid <= event.zuul_event_ltime + ) + ) + + update_attrs = {} + if must_update_commit_deps: + # Search for Depends-On headers and find appropriate changes + log.debug(" Updating commit dependencies for %s", change) + dependencies = [] + seen = set() + for match in find_dependency_headers(change.message): + log.debug(" Found Depends-On header: %s", match) + if match in seen: + continue + seen.add(match) + try: + url = urllib.parse.urlparse(match) + except ValueError: + continue + source = self.sched.connections.getSourceByHostname( + url.hostname) + if not source: + continue + log.debug(" Found source: %s", source) + dep = source.getChangeByURLWithRetry(match, event) + if dep and (not dep.is_merged) and dep not in dependencies: + log.debug(" Adding dependency: %s", dep) + dependencies.append(dep) + new_commit_needs_changes = [d.cache_key for d in dependencies] + + update_attrs = dict(commit_needs_changes=new_commit_needs_changes) # Ask the source for any tenant-specific changes (this allows # drivers to implement their own way of collecting deps): source = self.sched.connections.getSource( change.project.connection_name) - if self.useDependenciesByTopic(change.project): + if must_update_topic_deps: log.debug(" Updating topic dependencies for %s", change) new_topic_needs_changes = [] for dep in source.getChangesByTopic(change.topic): @@ -866,7 +927,8 @@ class PipelineManager(metaclass=ABCMeta): new_topic_needs_changes.append(dep.cache_key) update_attrs['topic_needs_changes'] = new_topic_needs_changes - source.setChangeAttributes(change, **update_attrs) + if update_attrs: + source.setChangeAttributes(change, **update_attrs) def provisionNodes(self, item): log = item.annotateLogger(self.log) @@ -1286,8 +1348,9 @@ class PipelineManager(metaclass=ABCMeta): build_set = item.current_build_set # if base_sha is not available, fallback to branch - to_sha = getattr(item.change, "base_sha", - getattr(item.change, "branch", None)) + to_sha = getattr(item.change, "base_sha", None) + if to_sha is None: + to_sha = getattr(item.change, "branch", None) self.sched.merger.getFilesChanges( item.change.project.connection_name, item.change.project.name, @@ -1503,6 +1566,7 @@ class PipelineManager(metaclass=ABCMeta): log.info("Dequeuing change %s because " "it can no longer merge" % item.change) self.cancelJobs(item) + quiet_dequeue = False if item.isBundleFailing(): item.setDequeuedBundleFailing('Bundle is failing') elif not meets_reqs: @@ -1514,7 +1578,28 @@ class PipelineManager(metaclass=ABCMeta): else: msg = f'Change {clist} is needed.' item.setDequeuedNeedingChange(msg) - if item.live: + # If all the dependencies are already in the pipeline + # (but not ahead of this change), then we probably + # just added updated versions of them, possibly + # updating a cycle. In that case, attempt to + # re-enqueue this change with the updated deps. + if (item.live and + all([self.isChangeAlreadyInPipeline(c) + for c in needs_changes])): + # Try enqueue, if that succeeds, keep this dequeue quiet + try: + log.info("Attempting re-enqueue of change %s", + item.change) + quiet_dequeue = self.addChange( + item.change, item.event, + enqueue_time=item.enqueue_time, + quiet=True, + skip_presence_check=True) + except Exception: + log.exception("Unable to re-enqueue change %s " + "which is missing dependencies", + item.change) + if item.live and not quiet_dequeue: try: self.reportItem(item) except exceptions.MergeFailure: @@ -1647,7 +1732,7 @@ class PipelineManager(metaclass=ABCMeta): if (item.live and not dequeued and self.sched.globals.use_relative_priority): priority = item.getNodePriority() - for request_id in item.current_build_set.node_requests.values(): + for _, request_id in item.current_build_set.getNodeRequests(): node_request = self.sched.nodepool.zk_nodepool.getNodeRequest( request_id, cached=True) if not node_request: @@ -1759,9 +1844,12 @@ class PipelineManager(metaclass=ABCMeta): if all_completed: self.sched.executor.resumeBuild(build) - build.updateAttributes( - build_set.item.pipeline.manager.current_context, - paused=False) + with build.activeContext(self.current_context): + build.paused = False + build.addEvent( + model.BuildEvent( + event_time=time.time(), + event_type=model.BuildEvent.TYPE_RESUMED)) def _resetDependentBuilds(self, build_set, build): job_graph = build_set.job_graph @@ -2143,7 +2231,7 @@ class PipelineManager(metaclass=ABCMeta): log.error("Reporting item %s received: %s", item, ret) return action, (not ret) - def reportStats(self, item, added=False): + def reportStats(self, item, trigger_event=None): if not self.sched.statsd: return try: @@ -2182,18 +2270,21 @@ class PipelineManager(metaclass=ABCMeta): if dt: self.sched.statsd.timing(key + '.resident_time', dt) self.sched.statsd.incr(key + '.total_changes') - if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): + if ( + trigger_event + and hasattr(trigger_event, 'arrived_at_scheduler_timestamp') + ): now = time.time() - arrived = item.event.arrived_at_scheduler_timestamp + arrived = trigger_event.arrived_at_scheduler_timestamp processing = (now - arrived) * 1000 - elapsed = (now - item.event.timestamp) * 1000 + elapsed = (now - trigger_event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) self.reportPipelineTiming('event_enqueue_time', - item.event.timestamp) + trigger_event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index aeeff5adc..ca52b37fe 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -61,13 +61,15 @@ class IndependentPipelineManager(PipelineManager): for needed_change in needed_changes: # This differs from the dependent pipeline by enqueuing # changes ahead as "not live", that is, not intended to - # have jobs run. Also, pipeline requirements are always - # ignored (which is safe because the changes are not - # live). + # have jobs run. Pipeline requirements are still in place + # in order to avoid unreviewed code being executed in + # pipelines that require review. if needed_change not in history: r = self.addChange(needed_change, event, quiet=True, - ignore_requirements=True, live=False, - change_queue=change_queue, history=history, + ignore_requirements=ignore_requirements, + live=False, + change_queue=change_queue, + history=history, dependency_graph=dependency_graph) if not r: return False diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index d400c253b..e4688a1b7 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -333,6 +333,26 @@ class Repo(object): os.rmdir(root) @staticmethod + def _cleanup_leaked_rebase_dirs(local_path, log, messages): + for rebase_dir in [".git/rebase-merge", ".git/rebase-apply"]: + leaked_dir = os.path.join(local_path, rebase_dir) + if not os.path.exists(leaked_dir): + continue + if log: + log.debug("Cleaning leaked %s dir", leaked_dir) + else: + messages.append( + f"Cleaning leaked {leaked_dir} dir") + try: + shutil.rmtree(leaked_dir) + except Exception as exc: + msg = f"Failed to remove leaked {leaked_dir} dir:" + if log: + log.exception(msg) + else: + messages.append(f"{msg}\n{exc}") + + @staticmethod def refNameToZuulRef(ref_name: str) -> str: return "refs/zuul/{}".format( hashlib.sha1(ref_name.encode("utf-8")).hexdigest() @@ -384,6 +404,8 @@ class Repo(object): messages.append("Delete stale Zuul ref {}".format(ref)) Repo._deleteRef(ref.path, repo) + Repo._cleanup_leaked_rebase_dirs(local_path, log, messages) + # Note: Before git 2.13 deleting a a ref foo/bar leaves an empty # directory foo behind that will block creating the reference foo # in the future. As a workaround we must clean up empty directories @@ -615,7 +637,11 @@ class Repo(object): self.fetch(ref, zuul_event_id=zuul_event_id) log.debug("Rebasing %s with args %s", ref, args) repo.git.checkout('FETCH_HEAD') - repo.git.rebase(*args) + try: + repo.git.rebase(*args) + except Exception: + repo.git.rebase(abort=True) + raise return repo.head.commit def fetch(self, ref, zuul_event_id=None): @@ -757,8 +783,19 @@ class Repo(object): return log = get_annotated_logger(self.log, zuul_event_id) log.debug("Set remote url to %s", redact_url(url)) - self._git_set_remote_url(self.createRepoObject(zuul_event_id), url) - self.remote_url = url + try: + # Update the remote URL as it is used for the clone if the + # repo doesn't exist. + self.remote_url = url + self._git_set_remote_url( + self.createRepoObject(zuul_event_id), self.remote_url) + except Exception: + # Clear out the stored remote URL so we will always set + # the Git URL after a failed attempt. This prevents us from + # using outdated credentials that might still be stored in + # the Git config as part of the URL. + self.remote_url = None + raise def mapLine(self, commit, filename, lineno, zuul_event_id=None): repo = self.createRepoObject(zuul_event_id) diff --git a/zuul/model.py b/zuul/model.py index e339b1ffa..e526b749c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -64,6 +64,7 @@ MERGER_MAP = { 'squash-merge': MERGER_SQUASH_MERGE, 'rebase': MERGER_REBASE, } +ALL_MERGE_MODES = list(MERGER_MAP.values()) PRECEDENCE_NORMAL = 0 PRECEDENCE_LOW = 1 @@ -437,6 +438,8 @@ class Pipeline(object): # reconfigured). A pipeline requires a tenant in order to # reach the currently active layout for that tenant. self.tenant = tenant + self.allow_other_connections = True + self.connections = [] self.source_context = None self.start_mark = None self.description = None @@ -472,6 +475,8 @@ class Pipeline(object): self.window_decrease_factor = None self.state = None self.change_list = None + # Only used by the unit tests for assertions + self._exception_count = 0 @property def queues(self): @@ -615,6 +620,18 @@ class PipelineState(zkobject.ZKObject): _read_only=False, ) + def _lateInitData(self): + # If we're initializing the object on our initial refresh, + # reset the data to this. + return dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=[], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + @classmethod def fromZK(klass, context, path, pipeline, **kw): obj = klass() @@ -626,21 +643,23 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def create(cls, pipeline, layout_uuid, old_state=None): - # If the object does not exist in ZK, create it with the - # default attributes and the supplied layout UUID. Otherwise, - # return an initialized object (or the old object for reuse) - # without loading any data so that data can be loaded on the - # next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline, old_state=None): + # If we are resetting an existing pipeline, we will have an + # old_state, so just clean up the object references there and + # let the next refresh handle updating any data. + if old_state: + old_state._resetObjectRefs() + return old_state + + # Otherwise, we are initializing a pipeline that we haven't + # seen before. It still might exist in ZK, but since we + # haven't seen it, we don't have any object references to + # clean up. We can just start with a clean object, set the + # pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. state = cls() state._set(pipeline=pipeline) - if state.exists(ctx): - if old_state: - old_state._resetObjectRefs() - return old_state - return state - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) + return state def _resetObjectRefs(self): # Update the pipeline references on the queue objects. @@ -707,8 +726,34 @@ class PipelineState(zkobject.ZKObject): # This is so that we can refresh the object in circumstances # where we haven't verified that our local layout matches # what's in ZK. + + # Notably, this need not prevent us from performing the + # initialization below if necessary. The case of the object + # being brand new in ZK supercedes our worry that our old copy + # might be out of date since our old copy is, itself, brand + # new. self._set(_read_only=read_only) - return super().refresh(context) + try: + return super().refresh(context) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and we + # should write it to ZK. + + # Note that typically this code is not used since + # currently other objects end up creating the pipeline + # path in ZK first. It is included in case that ever + # changes. Currently the empty byte-string code path in + # deserialize() is used instead. + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self._set(**self._lateInitData()) + self.internalCreate(context) def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so @@ -716,6 +761,20 @@ class PipelineState(zkobject.ZKObject): # source change cache. self.pipeline.manager.clearCache() + # If the object doesn't exist we will get back an empty byte + # string. This happens because the postConfig call creates + # this object without holding the pipeline lock, so it can't + # determine whether or not it exists in ZK. We do hold the + # pipeline lock here, so if we get the empty byte string, we + # know we're initializing the object. In that case, we should + # initialize the layout id to the current layout. Nothing + # else needs to be set. + if raw == b'': + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + return self._lateInitData() + data = super().deserialize(raw, context) if not self._read_only: @@ -890,11 +949,34 @@ class PipelineChangeList(zkobject.ShardedZKObject): super().__init__() self._set( changes=[], + _change_keys=[], ) - def refresh(self, context): - self._retry(context, super().refresh, - context, max_tries=5) + def refresh(self, context, allow_init=True): + # Set allow_init to false to indicate that we don't hold the + # lock and we should not try to initialize the object in ZK if + # it does not exist. + try: + self._retry(context, super().refresh, + context, max_tries=5) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and + # we should write it to ZK. + if allow_init: + context.log.warning( + "Initializing pipeline change list for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self.internalCreate(context) + else: + # If we're called from a context where we can't + # initialize the change list, re-raise the exception. + raise def getPath(self): return self.getChangeListPath(self.pipeline) @@ -905,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject): return pipeline_path + '/change_list' @classmethod - def create(cls, pipeline, old_list=None): - # If the object does not exist in ZK, create it with the - # default attributes. Otherwise, return an initialized object - # (or the old object for reuse) without loading any data so - # that data can be loaded on the next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline): + # This object may or may not exist in ZK, but we using any of + # that data here. We can just start with a clean object, set + # the pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. change_list = cls() change_list._set(pipeline=pipeline) - if change_list.exists(ctx): - if old_list: - return old_list - return change_list - return cls.new(ctx, pipeline=pipeline) + return change_list def serialize(self, context): data = { @@ -925,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") - def deserialize(self, data, context): - data = super().deserialize(data, context) + def deserialize(self, raw, context): + data = super().deserialize(raw, context) change_keys = [] # We must have a dictionary with a 'changes' key; otherwise we # may be reading immediately after truncating. Allow the @@ -1805,24 +1882,6 @@ class FrozenSecret(ConfigObject): ) -class ProjectContext(ConfigObject): - - def __init__(self, project_canonical_name, project_name): - super().__init__() - self.project_canonical_name = project_canonical_name - self.project_name = project_name - self.branch = None - self.path = None - - def __str__(self): - return self.project_name - - def toDict(self): - return dict( - project=self.project_name, - ) - - class SourceContext(ConfigObject): """A reference to the branch of a project in configuration. @@ -2391,6 +2450,12 @@ class FrozenJob(zkobject.ZKObject): data['_' + job_data_key] = None return data + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.buildset.updateJobVersion(context, self) + return super()._save(context, *args, **kw) + def setWaitingStatus(self, status): if self.waiting_status == status: return @@ -3720,6 +3785,27 @@ class BuildReference: self._path = _path +class BuildEvent: + TYPE_PAUSED = "paused" + TYPE_RESUMED = "resumed" + + def __init__(self, event_time, event_type, description=None): + self.event_time = event_time + self.event_type = event_type + self.description = description + + def toDict(self): + return { + "event_time": self.event_time, + "event_type": self.event_type, + "description": self.description, + } + + @classmethod + def fromDict(cls, data): + return cls(data["event_time"], data["event_type"], data["description"]) + + class Build(zkobject.ZKObject): """A Build is an instance of a single execution of a Job. @@ -3760,6 +3846,8 @@ class Build(zkobject.ZKObject): zuul_event_id=None, build_request_ref=None, span_info=None, + # A list of build events like paused, resume, ... + events=[], ) def serialize(self, context): @@ -3779,6 +3867,7 @@ class Build(zkobject.ZKObject): "zuul_event_id": self.zuul_event_id, "build_request_ref": self.build_request_ref, "span_info": self.span_info, + "events": [e.toDict() for e in self.events], } if COMPONENT_REGISTRY.model_api < 5: data["_result_data"] = (self._result_data.getPath() @@ -3801,6 +3890,11 @@ class Build(zkobject.ZKObject): def deserialize(self, raw, context): data = super().deserialize(raw, context) + # Deserialize build events + data["events"] = [ + BuildEvent.fromDict(e) for e in data.get("events", []) + ] + # Result data can change (between a pause and build # completion). @@ -3844,6 +3938,12 @@ class Build(zkobject.ZKObject): def getPath(self): return f"{self.job.getPath()}/build/{self.uuid}" + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.job.buildset.updateBuildVersion(context, self) + return super()._save(context, *args, **kw) + def __repr__(self): return ('<Build %s of %s voting:%s>' % (self.uuid, self.job.name, self.job.voting)) @@ -3875,6 +3975,12 @@ class Build(zkobject.ZKObject): data=secret_result_data, _path=self.getPath() + '/secret_result_data') + def addEvent(self, event): + if not self._active_context: + raise Exception( + "addEvent must be used with a context manager") + self.events.append(event) + @property def failed(self): if self.result and self.result not in ['SUCCESS', 'SKIPPED']: @@ -4048,6 +4154,8 @@ class BuildSet(zkobject.ZKObject): job_graph=None, jobs={}, deduplicated_jobs=[], + job_versions={}, + build_versions={}, # Cached job graph of previous layout; not serialized _old_job_graph=None, _old_jobs={}, @@ -4159,6 +4267,8 @@ class BuildSet(zkobject.ZKObject): "configured_time": self.configured_time, "start_time": self.start_time, "repo_state_request_time": self.repo_state_request_time, + "job_versions": self.job_versions, + "build_versions": self.build_versions, # jobs (serialize as separate objects) } return json.dumps(data, sort_keys=True).encode("utf8") @@ -4256,7 +4366,8 @@ class BuildSet(zkobject.ZKObject): if job_name in self.jobs: job = self.jobs[job_name] - if not old_build_exists: + if ((not old_build_exists) or + self.shouldRefreshJob(job)): tpe_jobs.append((None, job_name, tpe.submit(job.refresh, context))) else: @@ -4268,7 +4379,8 @@ class BuildSet(zkobject.ZKObject): build = self.builds.get(job_name) builds[job_name] = build if build and build.getPath() == build_path: - if not build.result: + if ((not build.result) or + self.shouldRefreshBuild(build)): tpe_jobs.append(( None, job_name, tpe.submit( build.refresh, context))) @@ -4323,6 +4435,48 @@ class BuildSet(zkobject.ZKObject): }) return data + def updateBuildVersion(self, context, build): + # It's tempting to update versions regardless of the model + # API, but if we start writing versions before all components + # are upgraded we could get out of sync. + if (COMPONENT_REGISTRY.model_api < 12): + return True + + # It is common for a lot of builds/jobs to be added at once, + # so to avoid writing this buildset object repeatedly during + # that time, we only update the version after the initial + # creation. + version = build.getZKVersion() + # If zstat is None, we created the object + if version is not None: + self.build_versions[build.uuid] = version + 1 + self.updateAttributes(context, build_versions=self.build_versions) + + def updateJobVersion(self, context, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + + version = job.getZKVersion() + if version is not None: + self.job_versions[job.name] = version + 1 + self.updateAttributes(context, job_versions=self.job_versions) + + def shouldRefreshBuild(self, build): + # Unless all schedulers are updating versions, we can't trust + # the data. + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = build.getZKVersion() + expected = self.build_versions.get(build.uuid, 0) + return expected != current + + def shouldRefreshJob(self, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = job.getZKVersion() + expected = self.job_versions.get(job.name, 0) + return expected != current + @property def ref(self): # NOTE(jamielennox): The concept of buildset ref is to be removed and a @@ -4428,8 +4582,18 @@ class BuildSet(zkobject.ZKObject): with self.activeContext(self.item.pipeline.manager.current_context): self.node_requests[job_name] = request_id - def getJobNodeRequestID(self, job_name): - return self.node_requests.get(job_name) + def getJobNodeRequestID(self, job_name, ignore_deduplicate=False): + r = self.node_requests.get(job_name) + if ignore_deduplicate and isinstance(r, dict): + return None + return r + + def getNodeRequests(self): + # This ignores deduplicated node requests + for job_name, request in self.node_requests.items(): + if isinstance(request, dict): + continue + yield job_name, request def removeJobNodeRequestID(self, job_name): if job_name in self.node_requests: @@ -4502,6 +4666,37 @@ class BuildSet(zkobject.ZKObject): return Attributes(uuid=self.uuid) +class EventInfo: + + def __init__(self): + self.zuul_event_id = None + self.timestamp = time.time() + self.span_context = None + + @classmethod + def fromEvent(cls, event): + tinfo = cls() + tinfo.zuul_event_id = event.zuul_event_id + tinfo.timestamp = event.timestamp + tinfo.span_context = event.span_context + return tinfo + + @classmethod + def fromDict(cls, d): + tinfo = cls() + tinfo.zuul_event_id = d["zuul_event_id"] + tinfo.timestamp = d["timestamp"] + tinfo.span_context = d["span_context"] + return tinfo + + def toDict(self): + return { + "zuul_event_id": self.zuul_event_id, + "timestamp": self.timestamp, + "span_context": self.span_context, + } + + class QueueItem(zkobject.ZKObject): """Represents the position of a Change in a ChangeQueue. @@ -4536,7 +4731,7 @@ class QueueItem(zkobject.ZKObject): live=True, # Whether an item is intended to be processed at all layout_uuid=None, _cached_sql_results={}, - event=None, # The trigger event that lead to this queue item + event=None, # Info about the event that lead to this queue item # Additional container for connection specifig information to be # used by reporters throughout the lifecycle @@ -4558,6 +4753,9 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) + if COMPONENT_REGISTRY.model_api >= 13: + obj._set(event=obj.event and EventInfo.fromEvent(obj.event)) + data = obj._trySerialize(context) obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None @@ -4586,10 +4784,18 @@ class QueueItem(zkobject.ZKObject): return (tenant, pipeline, uuid) def serialize(self, context): - if isinstance(self.event, TriggerEvent): - event_type = "TriggerEvent" + if COMPONENT_REGISTRY.model_api < 13: + if isinstance(self.event, TriggerEvent): + event_type = "TriggerEvent" + else: + event_type = self.event.__class__.__name__ else: - event_type = self.event.__class__.__name__ + event_type = "EventInfo" + if not isinstance(self.event, EventInfo): + # Convert our local trigger event to a trigger info + # object. This will only happen on the transition to + # model API version 13. + self._set(event=EventInfo.fromEvent(self.event)) data = { "uuid": self.uuid, @@ -4631,14 +4837,18 @@ class QueueItem(zkobject.ZKObject): # child objects. self._set(uuid=data["uuid"]) - event_type = data["event"]["type"] - if event_type == "TriggerEvent": - event_class = ( - self.pipeline.manager.sched.connections.getTriggerEventClass( - data["event"]["data"]["driver_name"]) - ) + if COMPONENT_REGISTRY.model_api < 13: + event_type = data["event"]["type"] + if event_type == "TriggerEvent": + event_class = ( + self.pipeline.manager.sched.connections + .getTriggerEventClass( + data["event"]["data"]["driver_name"]) + ) + else: + event_class = EventTypeIndex.event_type_mapping.get(event_type) else: - event_class = EventTypeIndex.event_type_mapping.get(event_type) + event_class = EventInfo if event_class is None: raise NotImplementedError( @@ -5860,8 +6070,7 @@ class Bundle: def deserialize(cls, context, queue, items_by_path, data): bundle = cls(data["uuid"]) bundle.items = [ - items_by_path.get(p) or QueueItem.fromZK( - context, p, pipeline=queue.pipeline, queue=queue) + items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue) for p in data["items"] ] bundle.started_reporting = data["started_reporting"] @@ -6968,6 +7177,7 @@ class TenantProjectConfig(object): self.extra_config_dirs = () # Load config from a different branch if this is a config project self.load_branch = None + self.merge_modes = None def isAlwaysDynamicBranch(self, branch): if self.always_dynamic_branches is None: @@ -6978,24 +7188,15 @@ class TenantProjectConfig(object): def includesBranch(self, branch): if self.include_branches is not None: - included = False for r in self.include_branches: if r.fullmatch(branch): - included = True - break - else: - included = True - if not included: + return True return False - excluded = False if self.exclude_branches is not None: for r in self.exclude_branches: if r.fullmatch(branch): - excluded = True - break - if excluded: - return False + return False return True diff --git a/zuul/model_api.py b/zuul/model_api.py index 9542cf415..0244296dd 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -13,5 +13,5 @@ # under the License. # When making ZK schema changes, increment this and add a record to -# docs/developer/model-changelog.rst -MODEL_API = 10 +# doc/source/developer/model-changelog.rst +MODEL_API = 13 diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e30495d20..cd15a878c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -23,6 +23,7 @@ import sys import threading import time import traceback +import urllib.parse import uuid from contextlib import suppress from zuul.vendor.contextlib import nullcontext @@ -53,6 +54,7 @@ from zuul.model import ( Abide, Build, BuildCompletedEvent, + BuildEvent, BuildPausedEvent, BuildStartedEvent, BuildStatusEvent, @@ -98,6 +100,8 @@ from zuul.zk.event_queues import ( PipelineManagementEventQueue, PipelineResultEventQueue, PipelineTriggerEventQueue, + PIPELINE_ROOT, + PIPELINE_NAME_ROOT, TENANT_ROOT, ) from zuul.zk.exceptions import LockException @@ -689,8 +693,8 @@ class Scheduler(threading.Thread): # In case we're in the middle of a reconfig, # include the old queue items. for item in pipeline.getAllItems(include_old=True): - nrs = item.current_build_set.node_requests - for req_id in (nrs.values()): + nrs = item.current_build_set.getNodeRequests() + for _, req_id in nrs: outstanding_requests.add(req_id) leaked_requests = zk_requests - outstanding_requests for req_id in leaked_requests: @@ -710,6 +714,7 @@ class Scheduler(threading.Thread): self._runMergerApiCleanup() self._runLayoutDataCleanup() self._runBlobStoreCleanup() + self._runLeakedPipelineCleanup() self.maintainConnectionCache() except Exception: self.log.exception("Error in general cleanup:") @@ -752,6 +757,54 @@ class Scheduler(threading.Thread): except Exception: self.log.exception("Error in layout data cleanup:") + def _runLeakedPipelineCleanup(self): + for tenant in self.abide.tenants.values(): + try: + with tenant_read_lock(self.zk_client, tenant.name, + blocking=False): + if not self.isTenantLayoutUpToDate(tenant.name): + self.log.debug( + "Skipping leaked pipeline cleanup for tenant %s", + tenant.name) + continue + valid_pipelines = tenant.layout.pipelines.values() + valid_state_paths = set( + p.state.getPath() for p in valid_pipelines) + valid_event_root_paths = set( + PIPELINE_NAME_ROOT.format( + tenant=p.tenant.name, pipeline=p.name) + for p in valid_pipelines) + + safe_tenant = urllib.parse.quote_plus(tenant.name) + state_root = f"/zuul/tenant/{safe_tenant}/pipeline" + event_root = PIPELINE_ROOT.format(tenant=tenant.name) + + all_state_paths = set( + f"{state_root}/{p}" for p in + self.zk_client.client.get_children(state_root)) + all_event_root_paths = set( + f"{event_root}/{p}" for p in + self.zk_client.client.get_children(event_root)) + + leaked_state_paths = all_state_paths - valid_state_paths + leaked_event_root_paths = ( + all_event_root_paths - valid_event_root_paths) + + for leaked_path in ( + leaked_state_paths | leaked_event_root_paths): + self.log.info("Removing leaked pipeline path %s", + leaked_path) + try: + self.zk_client.client.delete(leaked_path, + recursive=True) + except Exception: + self.log.exception( + "Error removing leaked pipeline path %s in " + "tenant %s", leaked_path, tenant.name) + except LockException: + # We'll cleanup this tenant on the next iteration + pass + def _runBlobStoreCleanup(self): self.log.debug("Starting blob store cleanup") try: @@ -1291,6 +1344,21 @@ class Scheduler(threading.Thread): self.log.info("Local layout update complete for %s (duration: %s " "seconds)", tenant_name, duration) + def isTenantLayoutUpToDate(self, tenant_name): + remote_state = self.tenant_layout_state.get(tenant_name) + if remote_state is None: + # The tenant may still be in the + # process of initial configuration + self.wake_event.set() + return False + local_state = self.local_layout_state.get(tenant_name) + if local_state is None or remote_state > local_state: + self.log.debug("Local layout of tenant %s not up to date", + tenant_name) + self.layout_update_event.set() + return False + return True + def _checkTenantSourceConf(self, config): tenant_config = None script = False @@ -1460,7 +1528,7 @@ class Scheduler(threading.Thread): with self.createZKContext(lock, self.log) as ctx: if tenant is not None: self._reconfigureTenant(ctx, min_ltimes, - -1, + event.zuul_event_ltime, tenant, old_tenant) else: self._reconfigureDeleteTenant(ctx, old_tenant) @@ -1480,7 +1548,6 @@ class Scheduler(threading.Thread): # This is called in the scheduler loop after another thread submits # a request if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.log.debug("Updating system config") self.updateSystemConfig() with self.layout_lock: @@ -1632,7 +1699,7 @@ class Scheduler(threading.Thread): item.removeBuild(build) builds_to_cancel.append(build) for request_job, request in \ - item.current_build_set.node_requests.items(): + item.current_build_set.getNodeRequests(): new_job = item.getJob(request_job) if not new_job: requests_to_cancel.append( @@ -1654,7 +1721,7 @@ class Scheduler(threading.Thread): for build in item.current_build_set.getBuilds(): builds_to_cancel.append(build) for request_job, request in \ - item.current_build_set.node_requests.items(): + item.current_build_set.getNodeRequests(): requests_to_cancel.append( ( item.current_build_set, @@ -1690,12 +1757,25 @@ class Scheduler(threading.Thread): new_pipeline = tenant.layout.pipelines.get(name) if not new_pipeline: with old_pipeline.manager.currentContext(context): - self._reconfigureDeletePipeline(old_pipeline) + try: + self._reconfigureDeletePipeline(old_pipeline) + except Exception: + self.log.exception( + "Failed to cleanup deleted pipeline %s:", + old_pipeline) + self.management_events[tenant.name].initialize() + self.trigger_events[tenant.name].initialize() self.connections.reconfigureDrivers(tenant) # TODOv3(jeblair): remove postconfig calls? for pipeline in tenant.layout.pipelines.values(): + self.pipeline_management_events[tenant.name][ + pipeline.name].initialize() + self.pipeline_trigger_events[tenant.name][ + pipeline.name].initialize() + self.pipeline_result_events[tenant.name + ][pipeline.name].initialize() for trigger in pipeline.triggers: trigger.postConfig(pipeline) for reporter in pipeline.actions: @@ -1753,7 +1833,11 @@ class Scheduler(threading.Thread): (tenant,)) for pipeline in tenant.layout.pipelines.values(): with pipeline.manager.currentContext(context): - self._reconfigureDeletePipeline(pipeline) + try: + self._reconfigureDeletePipeline(pipeline) + except Exception: + self.log.exception( + "Failed to cleanup deleted pipeline %s:", pipeline) # Delete the tenant root path for this tenant in ZooKeeper to remove # all tenant specific event queues @@ -1769,45 +1853,80 @@ class Scheduler(threading.Thread): def _reconfigureDeletePipeline(self, pipeline): self.log.info("Removing pipeline %s during reconfiguration" % (pipeline,)) - for shared_queue in pipeline.queues: - builds_to_cancel = [] - requests_to_cancel = [] - for item in shared_queue.queue: - with item.activeContext(pipeline.manager.current_context): - item.item_ahead = None - item.items_behind = [] - self.log.info( - "Removing item %s during reconfiguration" % (item,)) - for build in item.current_build_set.getBuilds(): - builds_to_cancel.append(build) - for request_job, request in \ - item.current_build_set.node_requests.items(): - requests_to_cancel.append( - ( - item.current_build_set, - request, - item.getJob(request_job), - ) + + ctx = pipeline.manager.current_context + pipeline.state.refresh(ctx) + + builds_to_cancel = [] + requests_to_cancel = [] + for item in pipeline.getAllItems(): + with item.activeContext(pipeline.manager.current_context): + item.item_ahead = None + item.items_behind = [] + self.log.info( + "Removing item %s during reconfiguration" % (item,)) + for build in item.current_build_set.getBuilds(): + builds_to_cancel.append(build) + for request_job, request in \ + item.current_build_set.getNodeRequests(): + requests_to_cancel.append( + ( + item.current_build_set, + request, + item.getJob(request_job), ) - try: - self.sql.reportBuildsetEnd( - item.current_build_set, 'dequeue', - final=False, result='DEQUEUED') - except Exception: - self.log.exception( - "Error reporting buildset completion to DB:") + ) + try: + self.sql.reportBuildsetEnd( + item.current_build_set, 'dequeue', + final=False, result='DEQUEUED') + except Exception: + self.log.exception( + "Error reporting buildset completion to DB:") - for build in builds_to_cancel: - self.log.info( - "Canceling build %s during reconfiguration" % (build,)) + for build in builds_to_cancel: + self.log.info( + "Canceling build %s during reconfiguration", build) + try: self.cancelJob(build.build_set, build.job, build=build, force=True) - for build_set, request, request_job in requests_to_cancel: - self.log.info( - "Canceling node request %s during reconfiguration", - request) + except Exception: + self.log.exception( + "Error canceling build %s during reconfiguration", build) + for build_set, request, request_job in requests_to_cancel: + self.log.info( + "Canceling node request %s during reconfiguration", request) + try: self.cancelJob(build_set, request_job, force=True) - shared_queue.delete(pipeline.manager.current_context) + except Exception: + self.log.exception( + "Error canceling node request %s during reconfiguration", + request) + + # Delete the pipeline event root path in ZooKeeper to remove + # all pipeline specific event queues. + try: + self.zk_client.client.delete( + PIPELINE_NAME_ROOT.format( + tenant=pipeline.tenant.name, + pipeline=pipeline.name), + recursive=True) + except Exception: + # In case a pipeline event has been submitted during + # reconfiguration this cleanup will fail. + self.log.exception( + "Error removing event queues for deleted pipeline %s in " + "tenant %s", pipeline.name, pipeline.tenant.name) + + # Delete the pipeline root path in ZooKeeper to remove all pipeline + # state. + try: + self.zk_client.client.delete(pipeline.state.getPath(), + recursive=True) + except Exception: + self.log.exception( + "Error removing state for deleted pipeline %s in tenant %s", + pipeline.name, pipeline.tenant.name) def _doPromoteEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name) @@ -2006,84 +2125,74 @@ class Scheduler(threading.Thread): return self.log.debug("Run handler awake") self.run_handler_lock.acquire() - try: - if not self._stopped: - self.process_reconfigure_queue() + with self.statsd_timer("zuul.scheduler.run_handler"): + try: + self._run() + except Exception: + self.log.exception("Exception in run handler:") + # There may still be more events to process + self.wake_event.set() + finally: + self.run_handler_lock.release() - if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.updateSystemConfig() + def _run(self): + if not self._stopped: + self.process_reconfigure_queue() - for tenant_name in self.unparsed_abide.tenants: - if self._stopped: - break + if self.unparsed_abide.ltime < self.system_config_cache.ltime: + self.updateSystemConfig() - tenant = self.abide.tenants.get(tenant_name) - if not tenant: - continue + for tenant_name in self.unparsed_abide.tenants: + if self._stopped: + break - # This will also forward events for the pipelines - # (e.g. enqueue or dequeue events) to the matching - # pipeline event queues that are processed afterwards. - self.process_tenant_management_queue(tenant) + tenant = self.abide.tenants.get(tenant_name) + if not tenant: + continue - if self._stopped: - break + # This will also forward events for the pipelines + # (e.g. enqueue or dequeue events) to the matching + # pipeline event queues that are processed afterwards. + self.process_tenant_management_queue(tenant) - try: - with tenant_read_lock( - self.zk_client, tenant_name, blocking=False - ) as tlock: - remote_state = self.tenant_layout_state.get( - tenant_name) - if remote_state is None: - # The tenant may still be in the - # process of initial configuration - self.wake_event.set() - continue - local_state = self.local_layout_state.get( - tenant_name) - if (local_state is None or - remote_state > local_state): - self.log.debug( - "Local layout of tenant %s not up to date", - tenant.name) - self.layout_update_event.set() - continue + if self._stopped: + break - # Get tenant again, as it might have been updated - # by a tenant reconfig or layout change. - tenant = self.abide.tenants[tenant_name] + try: + with tenant_read_lock( + self.zk_client, tenant_name, blocking=False + ) as tlock: + if not self.isTenantLayoutUpToDate(tenant_name): + continue - if not self._stopped: - # This will forward trigger events to pipeline - # event queues that are processed below. - self.process_tenant_trigger_queue(tenant) + # Get tenant again, as it might have been updated + # by a tenant reconfig or layout change. + tenant = self.abide.tenants[tenant_name] - self.process_pipelines(tenant, tlock) - except LockException: - self.log.debug("Skipping locked tenant %s", - tenant.name) - remote_state = self.tenant_layout_state.get( - tenant_name) - local_state = self.local_layout_state.get( - tenant_name) - if (remote_state is None or - local_state is None or - remote_state > local_state): - # Let's keep looping until we've updated to the - # latest tenant layout. - self.wake_event.set() - except Exception: - self.log.exception("Exception processing tenant %s:", - tenant_name) - # There may still be more events to process - self.wake_event.set() + if not self._stopped: + # This will forward trigger events to pipeline + # event queues that are processed below. + self.process_tenant_trigger_queue(tenant) + + self.process_pipelines(tenant, tlock) + except LockException: + self.log.debug("Skipping locked tenant %s", + tenant.name) + remote_state = self.tenant_layout_state.get( + tenant_name) + local_state = self.local_layout_state.get( + tenant_name) + if (remote_state is None or + local_state is None or + remote_state > local_state): + # Let's keep looping until we've updated to the + # latest tenant layout. + self.wake_event.set() except Exception: - self.log.exception("Exception in run handler:") + self.log.exception("Exception processing tenant %s:", + tenant_name) # There may still be more events to process self.wake_event.set() - finally: - self.run_handler_lock.release() def primeSystemConfig(self): with self.layout_lock: @@ -2101,6 +2210,7 @@ class Scheduler(threading.Thread): def updateSystemConfig(self): with self.layout_lock: + self.log.debug("Updating system config") self.unparsed_abide, self.globals = self.system_config_cache.get() self.ansible_manager = AnsibleManager( default_version=self.globals.default_ansible_version) @@ -2135,7 +2245,12 @@ class Scheduler(threading.Thread): self.zk_client, tenant.name, pipeline.name, blocking=False) as lock,\ self.createZKContext(lock, self.log) as ctx: + self.log.debug("Processing pipeline %s in tenant %s", + pipeline.name, tenant.name) with pipeline.manager.currentContext(ctx): + if ((tenant.name, pipeline.name) in + self._profile_pipelines): + ctx.profile = True with self.statsd_timer(f'{stats_key}.handling'): refreshed = self._process_pipeline( tenant, pipeline) @@ -2204,14 +2319,10 @@ class Scheduler(threading.Thread): stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' ctx = pipeline.manager.current_context - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = True with self.statsd_timer(f'{stats_key}.refresh'): pipeline.change_list.refresh(ctx) pipeline.summary.refresh(ctx) pipeline.state.refresh(ctx) - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = False pipeline.state.setDirty(self.zk_client.client) if pipeline.state.old_queues: @@ -2230,6 +2341,7 @@ class Scheduler(threading.Thread): pass except Exception: self.log.exception("Exception in pipeline processing:") + pipeline._exception_count += 1 pipeline.state.updateAttributes( ctx, state=pipeline.STATE_ERROR) # Continue processing other pipelines+tenants @@ -2247,7 +2359,9 @@ class Scheduler(threading.Thread): for pipeline in tenant.layout.pipelines.values(): self.log.debug("Gather relevant cache items for: %s %s", tenant.name, pipeline.name) - pipeline.change_list.refresh(ctx) + # This will raise an exception and abort the process if + # unable to refresh the change list. + pipeline.change_list.refresh(ctx, allow_init=False) change_keys = pipeline.change_list.getChangeKeys() relevant_changes = pipeline.manager.resolveChangeKeys( change_keys) @@ -2273,11 +2387,21 @@ class Scheduler(threading.Thread): with trigger_queue_lock( self.zk_client, tenant.name, blocking=False ): + self.log.debug("Processing tenant trigger events in %s", + tenant.name) # Update the pipeline changes ctx = self.createZKContext(None, self.log) for pipeline in tenant.layout.pipelines.values(): + # This will raise an exception if it is unable to + # refresh the change list. We will proceed anyway + # and use our data from the last time we did + # refresh in order to avoid stalling trigger + # processing. In this case we may not forward + # some events which are related to changes in the + # pipeline but don't match the pipeline trigger + # criteria. try: - pipeline.change_list.refresh(ctx) + pipeline.change_list.refresh(ctx, allow_init=False) except Exception: self.log.exception( "Unable to refresh pipeline change list for %s", @@ -2395,9 +2519,26 @@ class Scheduler(threading.Thread): event.span_context = tracing.getSpanContext(span) for pipeline in tenant.layout.pipelines.values(): + # For most kinds of dependencies, it's sufficient to check + # if this change is already in the pipeline, because the + # only way to update a dependency cycle is to update one + # of the changes in it. However, dependencies-by-topic + # can have changes added to the cycle without updating any + # of the existing changes in the cycle. That means in + # order to detect whether a new change is added to an + # existing cycle in the pipeline, we need to know all of + # the dependencies of the new change, and check if *they* + # are in the pipeline. Therefore, go ahead and update our + # dependencies here so they are available for comparison + # against the pipeline contents. This front-loads some + # work that otherwise would happen in the pipeline + # manager, but the result of the work goes into the change + # cache, so it's not wasted; it's just less parallelized. + if isinstance(change, Change): + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) - or pipeline.manager.isAnyVersionOfChangeInPipeline(change) + or pipeline.manager.isChangeRelevantToPipeline(change) ): self.pipeline_trigger_events[tenant.name][ pipeline.name @@ -2468,6 +2609,13 @@ class Scheduler(threading.Thread): with management_queue_lock( self.zk_client, tenant.name, blocking=False ): + if not self.isTenantLayoutUpToDate(tenant.name): + self.log.debug( + "Skipping management event queue for tenant %s", + tenant.name) + return + self.log.debug("Processing tenant management events in %s", + tenant.name) self._process_tenant_management_queue(tenant) except LockException: self.log.debug("Skipping locked management event queue" @@ -2698,6 +2846,10 @@ class Scheduler(threading.Thread): # with child job skipping. with build.activeContext(pipeline.manager.current_context): build.paused = True + build.addEvent( + BuildEvent( + event_time=time.time(), + event_type=BuildEvent.TYPE_PAUSED)) build.setResultData( event.data.get("data", {}), event.data.get("secret_data", {})) @@ -2825,7 +2977,8 @@ class Scheduler(threading.Thread): # In case the build didn't show up on any executor, the node # request does still exist, so we have to make sure it is # removed from ZK. - request_id = build.build_set.getJobNodeRequestID(build.job.name) + request_id = build.build_set.getJobNodeRequestID( + build.job.name, ignore_deduplicate=True) if request_id: self.nodepool.deleteNodeRequest( request_id, event_id=build.zuul_event_id) @@ -2926,9 +3079,10 @@ class Scheduler(threading.Thread): # Cancel node request if needed req_id = buildset.getJobNodeRequestID(job_name) if req_id: - req = self.nodepool.zk_nodepool.getNodeRequest(req_id) - if req: - self.nodepool.cancelRequest(req) + if not isinstance(req_id, dict): + req = self.nodepool.zk_nodepool.getNodeRequest(req_id) + if req: + self.nodepool.cancelRequest(req) buildset.removeJobNodeRequestID(job_name) # Cancel build if needed diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index faf97b48b..c2487af88 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -15,6 +15,8 @@ import abc import time +from zuul import model + class BaseSource(object, metaclass=abc.ABCMeta): """Base class for sources. @@ -95,7 +97,7 @@ class BaseSource(object, metaclass=abc.ABCMeta): # info on subsequent requests we can continue to do the # requested job work. try: - dep = self.getChangeByURL(url, event) + return self.getChangeByURL(url, event) except Exception: # Note that if the change isn't found dep is None. # We do not raise in that case and do not need to handle it @@ -107,7 +109,6 @@ class BaseSource(object, metaclass=abc.ABCMeta): time.sleep(1) else: raise - return dep @abc.abstractmethod def getChangesDependingOn(self, change, projects, tenant): @@ -183,6 +184,20 @@ class BaseSource(object, metaclass=abc.ABCMeta): """ + def getProjectMergeModes(self, project, tenant, min_ltime=-1): + """Get supported merge modes for a project + + This method is called very frequently, and should generally + return quickly. The connection is expected to cache merge + modes for all projects queried. + + The default implementation indicates that all merge modes are + supported. + + """ + + return model.ALL_MERGE_MODES + @abc.abstractmethod def getProjectBranchCacheLtime(self): """Return the current ltime of the project branch cache.""" diff --git a/zuul/version.py b/zuul/version.py index eafa6c2c7..80512842b 100644 --- a/zuul/version.py +++ b/zuul/version.py @@ -17,11 +17,10 @@ import json -import pbr.version +from importlib import metadata as importlib_metadata import pkg_resources -version_info = pbr.version.VersionInfo('zuul') -release_string = version_info.release_string() +release_string = importlib_metadata.distribution('zuul').version is_release = None git_version = None diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 7f27cd970..47226fd7d 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -395,7 +395,7 @@ class LogStreamer(object): self.finger_socket = socket.create_connection( (server, port), timeout=10) if use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = self.zuulweb.finger_tls_verify_hostnames context.load_cert_chain( diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py index 6fa531fad..0a8872158 100644 --- a/zuul/zk/branch_cache.py +++ b/zuul/zk/branch_cache.py @@ -13,11 +13,15 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import collections import logging import json from zuul.zk.zkobject import ZKContext, ShardedZKObject from zuul.zk.locks import SessionAwareReadLock, SessionAwareWriteLock, locked +from zuul.zk.components import COMPONENT_REGISTRY +from zuul import model from kazoo.exceptions import NoNodeError @@ -63,15 +67,28 @@ class BranchCacheZKObject(ShardedZKObject): def __init__(self): super().__init__() self._set(protected={}, - remainder={}) + remainder={}, + merge_modes={}) def serialize(self, context): data = { "protected": self.protected, "remainder": self.remainder, } + # This is mostly here to enable unit tests of upgrades, it's + # safe to move into the dict above at any time. + if (COMPONENT_REGISTRY.model_api >= 11): + data["merge_modes"] = self.merge_modes return json.dumps(data, sort_keys=True).encode("utf8") + def deserialize(self, raw, context): + data = super().deserialize(raw, context) + # MODEL_API < 11 + if "merge_modes" not in data: + data["merge_modes"] = collections.defaultdict( + lambda: model.ALL_MERGE_MODES) + return data + def _save(self, context, data, create=False): super()._save(context, data, create) zstat = context.client.exists(self.getPath()) @@ -119,10 +136,12 @@ class BranchCache: if projects is None: self.cache.protected.clear() self.cache.remainder.clear() + self.cache.merge_modes.clear() else: for p in projects: self.cache.protected.pop(p, None) self.cache.remainder.pop(p, None) + self.cache.merge_modes.pop(p, None) def getProjectBranches(self, project_name, exclude_unprotected, min_ltime=-1, default=RAISE_EXCEPTION): @@ -255,6 +274,68 @@ class BranchCache: if branch not in remainder_branches: remainder_branches.append(branch) + def getProjectMergeModes(self, project_name, + min_ltime=-1, default=RAISE_EXCEPTION): + """Get the merge modes for the given project. + + Checking the branch cache we need to distinguish three different + cases: + + 1. cache miss (not queried yet) + 2. cache hit (including empty list of merge modes) + 3. error when fetching merge modes + + If the cache doesn't contain any merge modes for the project and no + default value is provided a LookupError is raised. + + If there was an error fetching the merge modes, the return value + will be None. + + Otherwise the list of merge modes will be returned. + + :param str project_name: + The project for which the merge modes are returned. + :param int min_ltime: + The minimum cache ltime to consider the cache valid. + :param any default: + Optional default value to return if no cache entry exits. + + :returns: The list of merge modes by model id, or None if there was + an error when fetching the merge modes. + """ + if self.ltime < min_ltime: + with locked(self.rlock): + self.cache.refresh(self.zk_context) + + merge_modes = None + try: + merge_modes = self.cache.merge_modes[project_name] + except KeyError: + if default is RAISE_EXCEPTION: + raise LookupError( + f"No merge modes for project {project_name}") + else: + return default + + return merge_modes + + def setProjectMergeModes(self, project_name, merge_modes): + """Set the supported merge modes for the given project. + + Use None as a sentinel value for the merge modes to indicate + that there was a fetch error. + + :param str project_name: + The project for the merge modes. + :param list[int] merge_modes: + The list of merge modes (by model ID) or None. + + """ + + with locked(self.wlock): + with self.cache.activeContext(self.zk_context): + self.cache.merge_modes[project_name] = merge_modes + @property def ltime(self): return self.cache._zstat.last_modified_transaction_id diff --git a/zuul/zk/config_cache.py b/zuul/zk/config_cache.py index 45e355c83..4fcfe94c8 100644 --- a/zuul/zk/config_cache.py +++ b/zuul/zk/config_cache.py @@ -20,10 +20,10 @@ from collections.abc import MutableMapping from urllib.parse import quote_plus, unquote_plus from kazoo.exceptions import NoNodeError +from kazoo.recipe import lock from zuul import model from zuul.zk import sharding, ZooKeeperSimpleBase -from zuul.zk.vendor import lock CONFIG_ROOT = "/zuul/config" diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index ebb33ec88..ce38cf23e 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -144,6 +144,17 @@ class EventWatcher(ZooKeeperSimpleBase): self.watched_tenants.add(tenant_name) def _pipelineWatch(self, tenant_name, pipelines): + # Remove pipelines that no longer exists from the watch list so + # we re-register the children watch in case the pipeline is + # added again. + for watched_tenant, pipeline_name in list(self.watched_pipelines): + if watched_tenant != tenant_name: + continue + if pipeline_name in pipelines: + continue + with suppress(KeyError): + self.watched_pipelines.remove((tenant_name, pipeline_name)) + for pipeline_name in pipelines: key = (tenant_name, pipeline_name) if key in self.watched_pipelines: @@ -205,6 +216,9 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): self.queue_root = queue_root self.event_root = f'{queue_root}/queue' self.data_root = f'{queue_root}/data' + self.initialize() + + def initialize(self): self.kazoo_client.ensure_path(self.event_root) self.kazoo_client.ensure_path(self.data_root) diff --git a/zuul/zk/locks.py b/zuul/zk/locks.py index ade25dd75..ff098c5c5 100644 --- a/zuul/zk/locks.py +++ b/zuul/zk/locks.py @@ -17,9 +17,9 @@ from contextlib import contextmanager from urllib.parse import quote_plus from kazoo.protocol.states import KazooState +from kazoo.recipe.lock import Lock, ReadLock, WriteLock from zuul.zk.exceptions import LockException -from zuul.zk.vendor.lock import Lock, ReadLock, WriteLock LOCK_ROOT = "/zuul/locks" TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant" diff --git a/zuul/zk/vendor/lock.py b/zuul/zk/vendor/lock.py deleted file mode 100644 index fb387f70b..000000000 --- a/zuul/zk/vendor/lock.py +++ /dev/null @@ -1,759 +0,0 @@ -# This file is from the Kazoo project and contains fixes proposed in -# https://github.com/python-zk/kazoo/pull/650 -# -# https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/lock.py -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Zookeeper Locking Implementations - -:Maintainer: Ben Bangert <ben@groovie.org> -:Status: Production - -Error Handling -============== - -It's highly recommended to add a state listener with -:meth:`~KazooClient.add_listener` and watch for -:attr:`~KazooState.LOST` and :attr:`~KazooState.SUSPENDED` state -changes and re-act appropriately. In the event that a -:attr:`~KazooState.LOST` state occurs, its certain that the lock -and/or the lease has been lost. - -""" -import re -import sys - -try: - from time import monotonic as now -except ImportError: - from time import time as now -import uuid - -import six - -from kazoo.exceptions import ( - CancelledError, - KazooException, - LockTimeout, - NoNodeError, -) -from kazoo.protocol.states import KazooState -from kazoo.retry import ( - ForceRetryError, - KazooRetry, - RetryFailedError, -) - - -class _Watch(object): - def __init__(self, duration=None): - self.duration = duration - self.started_at = None - - def start(self): - self.started_at = now() - - def leftover(self): - if self.duration is None: - return None - else: - elapsed = now() - self.started_at - return max(0, self.duration - elapsed) - - -class Lock(object): - """Kazoo Lock - - Example usage with a :class:`~kazoo.client.KazooClient` instance: - - .. code-block:: python - - zk = KazooClient() - zk.start() - lock = zk.Lock("/lockpath", "my-identifier") - with lock: # blocks waiting for lock acquisition - # do something with the lock - - Note: This lock is not *re-entrant*. Repeated calls after already - acquired will block. - - This is an exclusive lock. For a read/write lock, see :class:`WriteLock` - and :class:`ReadLock`. - - """ - - # Node name, after the contender UUID, before the sequence - # number. Involved in read/write locks. - _NODE_NAME = "__lock__" - - # Node names which exclude this contender when present at a lower - # sequence number. Involved in read/write locks. - _EXCLUDE_NAMES = ["__lock__"] - - def __init__(self, client, path, identifier=None, extra_lock_patterns=()): - """Create a Kazoo lock. - - :param client: A :class:`~kazoo.client.KazooClient` instance. - :param path: The lock path to use. - :param identifier: Name to use for this lock contender. This can be - useful for querying to see who the current lock - contenders are. - :param extra_lock_patterns: Strings that will be used to - identify other znode in the path - that should be considered contenders - for this lock. - Use this for cross-implementation - compatibility. - - .. versionadded:: 2.7.1 - The extra_lock_patterns option. - """ - self.client = client - self.path = path - self._exclude_names = set( - self._EXCLUDE_NAMES + list(extra_lock_patterns) - ) - self._contenders_re = re.compile( - r"(?:{patterns})(-?\d{{10}})$".format( - patterns="|".join(self._exclude_names) - ) - ) - - # some data is written to the node. this can be queried via - # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode("utf-8") - self.node = None - - self.wake_event = client.handler.event_object() - - # props to Netflix Curator for this trick. It is possible for our - # create request to succeed on the server, but for a failure to - # prevent us from getting back the full path name. We prefix our - # lock name with a uuid and can check for its presence on retry. - self.prefix = uuid.uuid4().hex + self._NODE_NAME - self.create_path = self.path + "/" + self.prefix - - self.create_tried = False - self.is_acquired = False - self.assured_path = False - self.cancelled = False - self._retry = KazooRetry( - max_tries=None, sleep_func=client.handler.sleep_func - ) - self._lock = client.handler.lock_object() - - def _ensure_path(self): - self.client.ensure_path(self.path) - self.assured_path = True - - def cancel(self): - """Cancel a pending lock acquire.""" - self.cancelled = True - self.wake_event.set() - - def acquire(self, blocking=True, timeout=None, ephemeral=True): - """ - Acquire the lock. By defaults blocks and waits forever. - - :param blocking: Block until lock is obtained or return immediately. - :type blocking: bool - :param timeout: Don't wait forever to acquire the lock. - :type timeout: float or None - :param ephemeral: Don't use ephemeral znode for the lock. - :type ephemeral: bool - - :returns: Was the lock acquired? - :rtype: bool - - :raises: :exc:`~kazoo.exceptions.LockTimeout` if the lock - wasn't acquired within `timeout` seconds. - - .. warning:: - - When :attr:`ephemeral` is set to False session expiration - will not release the lock and must be handled separately. - - .. versionadded:: 1.1 - The timeout option. - - .. versionadded:: 2.4.1 - The ephemeral option. - """ - - def _acquire_lock(): - got_it = self._lock.acquire(False) - if not got_it: - raise ForceRetryError() - return True - - retry = self._retry.copy() - retry.deadline = timeout - - # Ensure we are locked so that we avoid multiple threads in - # this acquistion routine at the same time... - locked = self._lock.acquire(False) - if not locked and not blocking: - return False - if not locked: - # Lock acquire doesn't take a timeout, so simulate it... - # XXX: This is not true in Py3 >= 3.2 - try: - locked = retry(_acquire_lock) - except RetryFailedError: - return False - already_acquired = self.is_acquired - try: - gotten = False - try: - gotten = retry( - self._inner_acquire, - blocking=blocking, - timeout=timeout, - ephemeral=ephemeral, - ) - except RetryFailedError: - pass - except KazooException: - # if we did ultimately fail, attempt to clean up - exc_info = sys.exc_info() - if not already_acquired: - self._best_effort_cleanup() - self.cancelled = False - six.reraise(exc_info[0], exc_info[1], exc_info[2]) - if gotten: - self.is_acquired = gotten - if not gotten and not already_acquired: - self._best_effort_cleanup() - return gotten - finally: - self._lock.release() - - def _watch_session(self, state): - self.wake_event.set() - return True - - def _inner_acquire(self, blocking, timeout, ephemeral=True): - - # wait until it's our chance to get it.. - if self.is_acquired: - if not blocking: - return False - raise ForceRetryError() - - # make sure our election parent node exists - if not self.assured_path: - self._ensure_path() - - node = None - if self.create_tried: - node = self._find_node() - else: - self.create_tried = True - - if not node: - node = self.client.create( - self.create_path, self.data, ephemeral=ephemeral, sequence=True - ) - # strip off path to node - node = node[len(self.path) + 1:] - - self.node = node - - while True: - self.wake_event.clear() - - # bail out with an exception if cancellation has been requested - if self.cancelled: - raise CancelledError() - - predecessor = self._get_predecessor(node) - if predecessor is None: - return True - - if not blocking: - return False - - # otherwise we are in the mix. watch predecessor and bide our time - predecessor = self.path + "/" + predecessor - self.client.add_listener(self._watch_session) - try: - self.client.get(predecessor, self._watch_predecessor) - except NoNodeError: - pass # predecessor has already been deleted - else: - self.wake_event.wait(timeout) - if not self.wake_event.isSet(): - raise LockTimeout( - "Failed to acquire lock on %s after %s seconds" - % (self.path, timeout) - ) - finally: - self.client.remove_listener(self._watch_session) - - def _watch_predecessor(self, event): - self.wake_event.set() - - def _get_predecessor(self, node): - """returns `node`'s predecessor or None - - Note: This handle the case where the current lock is not a contender - (e.g. rlock), this and also edge cases where the lock's ephemeral node - is gone. - """ - node_sequence = node[len(self.prefix):] - children = self.client.get_children(self.path) - found_self = False - # Filter out the contenders using the computed regex - contender_matches = [] - for child in children: - match = self._contenders_re.search(child) - if match is not None: - contender_sequence = match.group(1) - # Only consider contenders with a smaller sequence number. - # A contender with a smaller sequence number has a higher - # priority. - if contender_sequence < node_sequence: - contender_matches.append(match) - if child == node: - # Remember the node's match object so we can short circuit - # below. - found_self = match - - if found_self is False: # pragma: nocover - # somehow we aren't in the childrens -- probably we are - # recovering from a session failure and our ephemeral - # node was removed. - raise ForceRetryError() - - if not contender_matches: - return None - - # Sort the contenders using the sequence number extracted by the regex - # and return the original string of the predecessor. - sorted_matches = sorted(contender_matches, key=lambda m: m.groups()) - return sorted_matches[-1].string - - def _find_node(self): - children = self.client.get_children(self.path) - for child in children: - if child.startswith(self.prefix): - return child - return None - - def _delete_node(self, node): - self.client.delete(self.path + "/" + node) - - def _best_effort_cleanup(self): - try: - node = self.node or self._find_node() - if node: - self._delete_node(node) - except KazooException: # pragma: nocover - pass - - def release(self): - """Release the lock immediately.""" - return self.client.retry(self._inner_release) - - def _inner_release(self): - if not self.is_acquired: - return False - - try: - self._delete_node(self.node) - except NoNodeError: # pragma: nocover - pass - - self.is_acquired = False - self.node = None - return True - - def contenders(self): - """Return an ordered list of the current contenders for the - lock. - - .. note:: - - If the contenders did not set an identifier, it will appear - as a blank string. - - """ - # make sure our election parent node exists - if not self.assured_path: - self._ensure_path() - - children = self.client.get_children(self.path) - # We want all contenders, including self (this is especially important - # for r/w locks). This is similar to the logic of `_get_predecessor` - # except we include our own pattern. - all_contenders_re = re.compile( - r"(?:{patterns})(-?\d{{10}})$".format( - patterns="|".join(self._exclude_names | {self._NODE_NAME}) - ) - ) - # Filter out the contenders using the computed regex - contender_matches = [] - for child in children: - match = all_contenders_re.search(child) - if match is not None: - contender_matches.append(match) - # Sort the contenders using the sequence number extracted by the regex, - # then extract the original string. - contender_nodes = [ - match.string - for match in sorted(contender_matches, key=lambda m: m.groups()) - ] - # Retrieve all the contender nodes data (preserving order). - contenders = [] - for node in contender_nodes: - try: - data, stat = self.client.get(self.path + "/" + node) - if data is not None: - contenders.append(data.decode("utf-8")) - except NoNodeError: # pragma: nocover - pass - - return contenders - - def __enter__(self): - self.acquire() - - def __exit__(self, exc_type, exc_value, traceback): - self.release() - - -class WriteLock(Lock): - """Kazoo Write Lock - - Example usage with a :class:`~kazoo.client.KazooClient` instance: - - .. code-block:: python - - zk = KazooClient() - zk.start() - lock = zk.WriteLock("/lockpath", "my-identifier") - with lock: # blocks waiting for lock acquisition - # do something with the lock - - The lock path passed to WriteLock and ReadLock must match for them to - communicate. The write lock can not be acquired if it is held by - any readers or writers. - - Note: This lock is not *re-entrant*. Repeated calls after already - acquired will block. - - This is the write-side of a shared lock. See :class:`Lock` for a - standard exclusive lock and :class:`ReadLock` for the read-side of a - shared lock. - - """ - - _NODE_NAME = "__lock__" - _EXCLUDE_NAMES = ["__lock__", "__rlock__"] - - -class ReadLock(Lock): - """Kazoo Read Lock - - Example usage with a :class:`~kazoo.client.KazooClient` instance: - - .. code-block:: python - - zk = KazooClient() - zk.start() - lock = zk.ReadLock("/lockpath", "my-identifier") - with lock: # blocks waiting for outstanding writers - # do something with the lock - - The lock path passed to WriteLock and ReadLock must match for them to - communicate. The read lock blocks if it is held by any writers, - but multiple readers may hold the lock. - - Note: This lock is not *re-entrant*. Repeated calls after already - acquired will block. - - This is the read-side of a shared lock. See :class:`Lock` for a - standard exclusive lock and :class:`WriteLock` for the write-side of a - shared lock. - - """ - - _NODE_NAME = "__rlock__" - _EXCLUDE_NAMES = ["__lock__"] - - -class Semaphore(object): - """A Zookeeper-based Semaphore - - This synchronization primitive operates in the same manner as the - Python threading version only uses the concept of leases to - indicate how many available leases are available for the lock - rather than counting. - - Note: This lock is not meant to be *re-entrant*. - - Example: - - .. code-block:: python - - zk = KazooClient() - semaphore = zk.Semaphore("/leasepath", "my-identifier") - with semaphore: # blocks waiting for lock acquisition - # do something with the semaphore - - .. warning:: - - This class stores the allowed max_leases as the data on the - top-level semaphore node. The stored value is checked once - against the max_leases of each instance. This check is - performed when acquire is called the first time. The semaphore - node needs to be deleted to change the allowed leases. - - .. versionadded:: 0.6 - The Semaphore class. - - .. versionadded:: 1.1 - The max_leases check. - - """ - - def __init__(self, client, path, identifier=None, max_leases=1): - """Create a Kazoo Lock - - :param client: A :class:`~kazoo.client.KazooClient` instance. - :param path: The semaphore path to use. - :param identifier: Name to use for this lock contender. This - can be useful for querying to see who the - current lock contenders are. - :param max_leases: The maximum amount of leases available for - the semaphore. - - """ - # Implementation notes about how excessive thundering herd - # and watches are avoided - # - A node (lease pool) holds children for each lease in use - # - A lock is acquired for a process attempting to acquire a - # lease. If a lease is available, the ephemeral node is - # created in the lease pool and the lock is released. - # - Only the lock holder watches for children changes in the - # lease pool - self.client = client - self.path = path - - # some data is written to the node. this can be queried via - # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode("utf-8") - self.max_leases = max_leases - self.wake_event = client.handler.event_object() - - self.create_path = self.path + "/" + uuid.uuid4().hex - self.lock_path = path + "-" + "__lock__" - self.is_acquired = False - self.assured_path = False - self.cancelled = False - self._session_expired = False - - def _ensure_path(self): - result = self.client.ensure_path(self.path) - self.assured_path = True - if result is True: - # node did already exist - data, _ = self.client.get(self.path) - try: - leases = int(data.decode("utf-8")) - except (ValueError, TypeError): - # ignore non-numeric data, maybe the node data is used - # for other purposes - pass - else: - if leases != self.max_leases: - raise ValueError( - "Inconsistent max leases: %s, expected: %s" - % (leases, self.max_leases) - ) - else: - self.client.set(self.path, str(self.max_leases).encode("utf-8")) - - def cancel(self): - """Cancel a pending semaphore acquire.""" - self.cancelled = True - self.wake_event.set() - - def acquire(self, blocking=True, timeout=None): - """Acquire the semaphore. By defaults blocks and waits forever. - - :param blocking: Block until semaphore is obtained or - return immediately. - :type blocking: bool - :param timeout: Don't wait forever to acquire the semaphore. - :type timeout: float or None - - :returns: Was the semaphore acquired? - :rtype: bool - - :raises: - ValueError if the max_leases value doesn't match the - stored value. - - :exc:`~kazoo.exceptions.LockTimeout` if the semaphore - wasn't acquired within `timeout` seconds. - - .. versionadded:: 1.1 - The blocking, timeout arguments and the max_leases check. - """ - # If the semaphore had previously been canceled, make sure to - # reset that state. - self.cancelled = False - - try: - self.is_acquired = self.client.retry( - self._inner_acquire, blocking=blocking, timeout=timeout - ) - except KazooException: - # if we did ultimately fail, attempt to clean up - self._best_effort_cleanup() - self.cancelled = False - raise - - return self.is_acquired - - def _inner_acquire(self, blocking, timeout=None): - """Inner loop that runs from the top anytime a command hits a - retryable Zookeeper exception.""" - self._session_expired = False - self.client.add_listener(self._watch_session) - - if not self.assured_path: - self._ensure_path() - - # Do we already have a lease? - if self.client.exists(self.create_path): - return True - - w = _Watch(duration=timeout) - w.start() - lock = self.client.Lock(self.lock_path, self.data) - try: - gotten = lock.acquire(blocking=blocking, timeout=w.leftover()) - if not gotten: - return False - while True: - self.wake_event.clear() - - # Attempt to grab our lease... - if self._get_lease(): - return True - - if blocking: - # If blocking, wait until self._watch_lease_change() is - # called before returning - self.wake_event.wait(w.leftover()) - if not self.wake_event.isSet(): - raise LockTimeout( - "Failed to acquire semaphore on %s" - " after %s seconds" % (self.path, timeout) - ) - else: - return False - finally: - lock.release() - - def _watch_lease_change(self, event): - self.wake_event.set() - - def _get_lease(self, data=None): - # Make sure the session is still valid - if self._session_expired: - raise ForceRetryError("Retry on session loss at top") - - # Make sure that the request hasn't been canceled - if self.cancelled: - raise CancelledError("Semaphore cancelled") - - # Get a list of the current potential lock holders. If they change, - # notify our wake_event object. This is used to unblock a blocking - # self._inner_acquire call. - children = self.client.get_children( - self.path, self._watch_lease_change - ) - - # If there are leases available, acquire one - if len(children) < self.max_leases: - self.client.create(self.create_path, self.data, ephemeral=True) - - # Check if our acquisition was successful or not. Update our state. - if self.client.exists(self.create_path): - self.is_acquired = True - else: - self.is_acquired = False - - # Return current state - return self.is_acquired - - def _watch_session(self, state): - if state == KazooState.LOST: - self._session_expired = True - self.wake_event.set() - - # Return true to de-register - return True - - def _best_effort_cleanup(self): - try: - self.client.delete(self.create_path) - except KazooException: # pragma: nocover - pass - - def release(self): - """Release the lease immediately.""" - return self.client.retry(self._inner_release) - - def _inner_release(self): - if not self.is_acquired: - return False - try: - self.client.delete(self.create_path) - except NoNodeError: # pragma: nocover - pass - self.is_acquired = False - return True - - def lease_holders(self): - """Return an unordered list of the current lease holders. - - .. note:: - - If the lease holder did not set an identifier, it will - appear as a blank string. - - """ - if not self.client.exists(self.path): - return [] - - children = self.client.get_children(self.path) - - lease_holders = [] - for child in children: - try: - data, stat = self.client.get(self.path + "/" + child) - lease_holders.append(data.decode("utf-8")) - except NoNodeError: # pragma: nocover - pass - return lease_holders - - def __enter__(self): - self.acquire() - - def __exit__(self, exc_type, exc_value, traceback): - self.release() diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 73adf5954..87d76bca6 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -233,7 +233,18 @@ class ZKObject: obj._load(context, path=path) return obj + def internalCreate(self, context): + """Create the object in ZK from an existing ZKObject + + This should only be used in special circumstances: when we + know it's safe to start using a ZKObject before it's actually + created in ZK. Normally use .new() + """ + data = self._trySerialize(context) + self._save(context, data, create=True) + def refresh(self, context): + """Update data from ZK""" self._load(context) @@ -308,6 +319,17 @@ class ZKObject: return (compressed_size, uncompressed_size) + def getZKVersion(self): + """Return the ZK version of the object as of the last load/refresh. + + Returns None if the object is newly created. + """ + zstat = getattr(self, '_zstat', None) + # If zstat is None, we created the object + if zstat is None: + return None + return zstat.version + # Private methods below def _retry(self, context, func, *args, max_tries=-1, **kw): |