diff options
36 files changed, 925 insertions, 313 deletions
diff --git a/Dockerfile b/Dockerfile index 5a17739f0..df326bd8a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,6 +29,7 @@ ARG REACT_APP_ENABLE_SERVICE_WORKER # Kubectl/Openshift version/sha ARG OPENSHIFT_URL=https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/4.11.20/openshift-client-linux-4.11.20.tar.gz ARG OPENSHIFT_SHA=74f252c812932425ca19636b2be168df8fe57b114af6b114283975e67d987d11 +ARG PBR_VERSION= COPY . /tmp/src COPY --from=js-builder /tmp/src/build /tmp/src/zuul/web/static diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index b80979362..f78b3f0a0 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -112,3 +112,10 @@ Version 12 :Prior Zuul version: 8.0.1 :Description: Adds job_versions and build_versions to BuildSet. Affects schedulers. + +Version 13 +---------- +:Prior Zuul version: 8.2.0 +:Description: Stores only the necessary event info as part of a queue item + instead of the full trigger event. + Affects schedulers. diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 7f99c7f7f..f40bee445 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -444,6 +444,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`: Indicates if the executor is paused. 1 means paused else 0. + .. stat:: pct_used_hdd + :type: gauge + + The used disk on this executor, as a percentage multiplied by 100. + .. stat:: pct_used_ram :type: gauge @@ -711,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The size of the current connection event queue. + .. stat:: run_handler + :type: timer + + A timer metric reporting the time taken for one scheduler run + handler iteration. + .. stat:: time_query :type: timer diff --git a/noxfile.py b/noxfile.py index e920f053e..30058cef7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -32,7 +32,6 @@ def set_standard_env_vars(session): set_env(session, 'OS_STDERR_CAPTURE', '1') set_env(session, 'OS_STDOUT_CAPTURE', '1') set_env(session, 'OS_TEST_TIMEOUT', '360') - set_env(session, 'SQLALCHEMY_WARN_20', '1') session.env['PYTHONWARNINGS'] = ','.join([ 'always::DeprecationWarning:zuul.driver.sql.sqlconnection', 'always::DeprecationWarning:tests.base', @@ -49,7 +48,6 @@ def set_standard_env_vars(session): @nox.session(python='3') def bindep(session): set_standard_env_vars(session) - set_env(session, 'SQLALCHEMY_WARN_20', '1') session.install('bindep') session.run('bindep', 'test') diff --git a/requirements.txt b/requirements.txt index 082a18043..e12e9cf72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ PrettyTable>=0.6,<0.8 babel>=1.0 netaddr kazoo>=2.9.0 -sqlalchemy<2.0.0 +sqlalchemy>=2.0.0 alembic cryptography>=39.0.0 cachecontrol<0.12.7 diff --git a/tests/make_playbooks.py b/tests/make_playbooks.py index 93c37bc81..cb7a98096 100755 --- a/tests/make_playbooks.py +++ b/tests/make_playbooks.py @@ -40,7 +40,8 @@ def handle_repo(path): config_path = os.path.join(path, fn) break try: - config = yaml.safe_load(open(config_path)) + with open(config_path) as f: + config = yaml.safe_load(f) except Exception: print(" Has yaml errors") return diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index c30e1743d..26a99215e 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -65,7 +65,7 @@ class TestSQLConnectionMysql(ZuulTestCase): def _sql_tables_created(self, connection_name): connection = self.scheds.first.connections.connections[connection_name] - insp = sa.engine.reflection.Inspector(connection.engine) + insp = sa.inspect(connection.engine) table_prefix = connection.table_prefix self.assertEqual(self.expected_table_prefix, table_prefix) @@ -82,7 +82,7 @@ class TestSQLConnectionMysql(ZuulTestCase): def _sql_indexes_created(self, connection_name): connection = self.scheds.first.connections.connections[connection_name] - insp = sa.engine.reflection.Inspector(connection.engine) + insp = sa.inspect(connection.engine) table_prefix = connection.table_prefix self.assertEqual(self.expected_table_prefix, table_prefix) @@ -127,7 +127,7 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() self.assertEqual(5, len(buildsets)) @@ -137,107 +137,107 @@ class TestSQLConnectionMysql(ZuulTestCase): buildset3 = buildsets[3] buildset4 = buildsets[4] - self.assertEqual('check', buildset0['pipeline']) - self.assertEqual('org/project', buildset0['project']) - self.assertEqual(1, buildset0['change']) - self.assertEqual('1', buildset0['patchset']) - self.assertEqual('SUCCESS', buildset0['result']) - self.assertEqual('Build succeeded.', buildset0['message']) - self.assertEqual('tenant-one', buildset0['tenant']) + self.assertEqual('check', buildset0.pipeline) + self.assertEqual('org/project', buildset0.project) + self.assertEqual(1, buildset0.change) + self.assertEqual('1', buildset0.patchset) + self.assertEqual('SUCCESS', buildset0.result) + self.assertEqual('Build succeeded.', buildset0.message) + self.assertEqual('tenant-one', buildset0.tenant) self.assertEqual( - 'https://review.example.com/%d' % buildset0['change'], - buildset0['ref_url']) - self.assertNotEqual(None, buildset0['event_id']) - self.assertNotEqual(None, buildset0['event_timestamp']) + 'https://review.example.com/%d' % buildset0.change, + buildset0.ref_url) + self.assertNotEqual(None, buildset0.event_id) + self.assertNotEqual(None, buildset0.event_timestamp) buildset0_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() # Check the first result, which should be the project-merge job self.assertEqual( - 'project-merge', buildset0_builds[0]['job_name']) - self.assertEqual("SUCCESS", buildset0_builds[0]['result']) - self.assertEqual(None, buildset0_builds[0]['log_url']) - self.assertEqual('check', buildset1['pipeline']) - self.assertEqual('master', buildset1['branch']) - self.assertEqual('org/project', buildset1['project']) - self.assertEqual(2, buildset1['change']) - self.assertEqual('1', buildset1['patchset']) - self.assertEqual('FAILURE', buildset1['result']) - self.assertEqual('Build failed.', buildset1['message']) + 'project-merge', buildset0_builds[0].job_name) + self.assertEqual("SUCCESS", buildset0_builds[0].result) + self.assertEqual(None, buildset0_builds[0].log_url) + self.assertEqual('check', buildset1.pipeline) + self.assertEqual('master', buildset1.branch) + self.assertEqual('org/project', buildset1.project) + self.assertEqual(2, buildset1.change) + self.assertEqual('1', buildset1.patchset) + self.assertEqual('FAILURE', buildset1.result) + self.assertEqual('Build failed.', buildset1.message) buildset1_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset1['id'] + buildset1.id ) ).fetchall() # Check the second result, which should be the project-test1 # job which failed self.assertEqual( - 'project-test1', buildset1_builds[1]['job_name']) - self.assertEqual("FAILURE", buildset1_builds[1]['result']) - self.assertEqual(None, buildset1_builds[1]['log_url']) + 'project-test1', buildset1_builds[1].job_name) + self.assertEqual("FAILURE", buildset1_builds[1].result) + self.assertEqual(None, buildset1_builds[1].log_url) buildset2_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset2['id'] + buildset2.id ) ).fetchall() # Check the first result, which should be the project-publish # job self.assertEqual('project-publish', - buildset2_builds[0]['job_name']) - self.assertEqual("SUCCESS", buildset2_builds[0]['result']) + buildset2_builds[0].job_name) + self.assertEqual("SUCCESS", buildset2_builds[0].result) buildset3_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset3['id'] + buildset3.id ) ).fetchall() self.assertEqual( - 'project-test1', buildset3_builds[1]['job_name']) - self.assertEqual('NODE_FAILURE', buildset3_builds[1]['result']) - self.assertEqual(None, buildset3_builds[1]['log_url']) - self.assertIsNotNone(buildset3_builds[1]['start_time']) - self.assertIsNotNone(buildset3_builds[1]['end_time']) + 'project-test1', buildset3_builds[1].job_name) + self.assertEqual('NODE_FAILURE', buildset3_builds[1].result) + self.assertEqual(None, buildset3_builds[1].log_url) + self.assertIsNotNone(buildset3_builds[1].start_time) + self.assertIsNotNone(buildset3_builds[1].end_time) self.assertGreaterEqual( - buildset3_builds[1]['end_time'], - buildset3_builds[1]['start_time']) + buildset3_builds[1].end_time, + buildset3_builds[1].start_time) # Check the paused build result buildset4_builds = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_table - ]).where( + ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset4['id'] + buildset4.id ).order_by(reporter.connection.zuul_build_table.c.id) ).fetchall() paused_build_events = conn.execute( - sa.sql.select([ + sa.sql.select( reporter.connection.zuul_build_event_table - ]).where( + ).where( reporter.connection.zuul_build_event_table.c.build_id - == buildset4_builds[0]["id"] + == buildset4_builds[0].id ) ).fetchall() @@ -245,16 +245,16 @@ class TestSQLConnectionMysql(ZuulTestCase): pause_event = paused_build_events[0] resume_event = paused_build_events[1] self.assertEqual( - pause_event["event_type"], "paused") - self.assertIsNotNone(pause_event["event_time"]) - self.assertIsNone(pause_event["description"]) + pause_event.event_type, "paused") + self.assertIsNotNone(pause_event.event_time) + self.assertIsNone(pause_event.description) self.assertEqual( - resume_event["event_type"], "resumed") - self.assertIsNotNone(resume_event["event_time"]) - self.assertIsNone(resume_event["description"]) + resume_event.event_type, "resumed") + self.assertIsNotNone(resume_event.event_time) + self.assertIsNone(resume_event.description) self.assertGreater( - resume_event["event_time"], pause_event["event_time"]) + resume_event.event_time, pause_event.event_time) self.executor_server.hold_jobs_in_build = True @@ -333,51 +333,51 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() self.assertEqual(1, len(buildsets)) buildset0 = buildsets[0] - self.assertEqual('check', buildset0['pipeline']) - self.assertEqual('org/project', buildset0['project']) - self.assertEqual(1, buildset0['change']) - self.assertEqual('1', buildset0['patchset']) - self.assertEqual('SUCCESS', buildset0['result']) - self.assertEqual('Build succeeded.', buildset0['message']) - self.assertEqual('tenant-one', buildset0['tenant']) + self.assertEqual('check', buildset0.pipeline) + self.assertEqual('org/project', buildset0.project) + self.assertEqual(1, buildset0.change) + self.assertEqual('1', buildset0.patchset) + self.assertEqual('SUCCESS', buildset0.result) + self.assertEqual('Build succeeded.', buildset0.message) + self.assertEqual('tenant-one', buildset0.tenant) self.assertEqual( - 'https://review.example.com/%d' % buildset0['change'], - buildset0['ref_url']) + 'https://review.example.com/%d' % buildset0.change, + buildset0.ref_url) buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() # Check the retry results - self.assertEqual('project-merge', buildset0_builds[0]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[0]['result']) - self.assertTrue(buildset0_builds[0]['final']) - - self.assertEqual('project-test1', buildset0_builds[1]['job_name']) - self.assertEqual('RETRY', buildset0_builds[1]['result']) - self.assertFalse(buildset0_builds[1]['final']) - self.assertEqual('project-test2', buildset0_builds[2]['job_name']) - self.assertEqual('RETRY', buildset0_builds[2]['result']) - self.assertFalse(buildset0_builds[2]['final']) - - self.assertEqual('project-test1', buildset0_builds[3]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[3]['result']) - self.assertTrue(buildset0_builds[3]['final']) - self.assertEqual('project-test2', buildset0_builds[4]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[4]['result']) - self.assertTrue(buildset0_builds[4]['final']) + self.assertEqual('project-merge', buildset0_builds[0].job_name) + self.assertEqual('SUCCESS', buildset0_builds[0].result) + self.assertTrue(buildset0_builds[0].final) + + self.assertEqual('project-test1', buildset0_builds[1].job_name) + self.assertEqual('RETRY', buildset0_builds[1].result) + self.assertFalse(buildset0_builds[1].final) + self.assertEqual('project-test2', buildset0_builds[2].job_name) + self.assertEqual('RETRY', buildset0_builds[2].result) + self.assertFalse(buildset0_builds[2].final) + + self.assertEqual('project-test1', buildset0_builds[3].job_name) + self.assertEqual('SUCCESS', buildset0_builds[3].result) + self.assertTrue(buildset0_builds[3].final) + self.assertEqual('project-test2', buildset0_builds[4].job_name) + self.assertEqual('SUCCESS', buildset0_builds[4].result) + self.assertTrue(buildset0_builds[4].final) self.executor_server.hold_jobs_in_build = True @@ -430,7 +430,7 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() @@ -439,10 +439,10 @@ class TestSQLConnectionMysql(ZuulTestCase): buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() @@ -488,7 +488,7 @@ class TestSQLConnectionMysql(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table]) + sa.sql.select(reporter.connection.zuul_buildset_table) ) buildsets = result.fetchall() @@ -497,10 +497,10 @@ class TestSQLConnectionMysql(ZuulTestCase): buildset0_builds = conn.execute( sa.sql.select( - [reporter.connection.zuul_build_table] + reporter.connection.zuul_build_table ).where( reporter.connection.zuul_build_table.c.buildset_id == - buildset0['id'] + buildset0.id ) ).fetchall() diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index 2e3057af6..2a63d5ef8 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -957,3 +957,48 @@ class TestGerritConnection(ZuulTestCase): self.assertEqual(B.queried, 2) self.assertEqual(A.data['status'], 'MERGED') self.assertEqual(B.data['status'], 'MERGED') + + +class TestGerritUnicodeRefs(ZuulTestCase): + config_file = 'zuul-gerrit-web.conf' + tenant_config_file = 'config/single-tenant/main.yaml' + + upload_pack_data = (b'014452944ee370db5c87691e62e0f9079b6281319b4e HEAD' + b'\x00multi_ack thin-pack side-band side-band-64k ' + b'ofs-delta shallow deepen-since deepen-not ' + b'deepen-relative no-progress include-tag ' + b'multi_ack_detailed allow-tip-sha1-in-want ' + b'allow-reachable-sha1-in-want ' + b'symref=HEAD:refs/heads/faster filter ' + b'object-format=sha1 agent=git/2.37.1.gl1\n' + b'003d5f42665d737b3fd4ec22ca0209e6191859f09fd6 ' + b'refs/for/faster\n' + b'004952944ee370db5c87691e62e0f9079b6281319b4e ' + b'refs/heads/foo/\xf0\x9f\x94\xa5\xf0\x9f\x94\xa5' + b'\xf0\x9f\x94\xa5\n' + b'003f52944ee370db5c87691e62e0f9079b6281319b4e ' + b'refs/heads/faster\n0000').decode("utf-8") + + def test_mb_unicode_refs(self): + gerrit_config = { + 'user': 'gerrit', + 'server': 'localhost', + } + driver = GerritDriver() + gerrit = GerritConnection(driver, 'review_gerrit', gerrit_config) + + def _uploadPack(project): + return self.upload_pack_data + + self.patch(gerrit, '_uploadPack', _uploadPack) + + project = gerrit.source.getProject('org/project') + refs = gerrit.getInfoRefs(project) + + self.assertEqual(refs, + {'refs/for/faster': + '5f42665d737b3fd4ec22ca0209e6191859f09fd6', + 'refs/heads/foo/🔥🔥🔥': + '52944ee370db5c87691e62e0f9079b6281319b4e', + 'refs/heads/faster': + '52944ee370db5c87691e62e0f9079b6281319b4e'}) diff --git a/tests/unit/test_git_driver.py b/tests/unit/test_git_driver.py index 06e2ac7c8..95fca30d3 100644 --- a/tests/unit/test_git_driver.py +++ b/tests/unit/test_git_driver.py @@ -62,7 +62,8 @@ class TestGitDriver(ZuulTestCase): # Update zuul.yaml to force a tenant reconfiguration path = os.path.join(self.upstream_root, 'common-config', 'zuul.yaml') - config = yaml.safe_load(open(path, 'r').read()) + with open(path, 'r') as f: + config = yaml.safe_load(f) change = { 'name': 'org/project', 'check': { diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index e3706440b..fb46aa7d1 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -18,8 +18,10 @@ import re from testtools.matchers import MatchesRegex, Not, StartsWith import urllib import socket +import threading import time import textwrap +from concurrent.futures import ThreadPoolExecutor from unittest import mock, skip import git @@ -32,10 +34,11 @@ from zuul.zk.layout import LayoutState from zuul.lib import strings from zuul.merger.merger import Repo from zuul.model import MergeRequest, EnqueueEvent, DequeueEvent +from zuul.zk.change_cache import ChangeKey from tests.base import (AnsibleZuulTestCase, BaseTestCase, ZuulGithubAppTestCase, ZuulTestCase, - simple_layout, random_sha1) + simple_layout, random_sha1, iterate_timeout) from tests.base import ZuulWebFixture EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) @@ -1484,6 +1487,44 @@ class TestGithubDriver(ZuulTestCase): "rebase not supported", str(loading_errors[0].error)) + @simple_layout("layouts/basic-github.yaml", driver="github") + def test_concurrent_get_change(self): + """ + Test that getting a change concurrently returns the same + object from the cache. + """ + conn = self.scheds.first.sched.connections.connections["github"] + + # Create a new change object and remove it from the cache so + # the concurrent call will try to create a new change object. + A = self.fake_github.openFakePullRequest("org/project", "master", "A") + change_key = ChangeKey(conn.connection_name, "org/project", + "PullRequest", str(A.number), str(A.head_sha)) + change = conn.getChange(change_key, refresh=True) + conn._change_cache.delete(change_key) + + # Acquire the update lock so the concurrent get task needs to + # wait for the lock to be release. + lock = conn._change_update_lock.setdefault(change_key, + threading.Lock()) + lock.acquire() + try: + executor = ThreadPoolExecutor(max_workers=1) + task = executor.submit(conn.getChange, change_key, refresh=True) + for _ in iterate_timeout(5, "task to be running"): + if task.running(): + break + # Add the change back so the waiting task can get the + # change from the cache. + conn._change_cache.set(change_key, change) + finally: + lock.release() + executor.shutdown() + + other_change = task.result() + self.assertIsNotNone(other_change.cache_stat) + self.assertIs(change, other_change) + class TestMultiGithubDriver(ZuulTestCase): config_file = 'zuul-multi-github.conf' diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py index 7806db347..f907cb8b4 100644 --- a/tests/unit/test_merger_repo.py +++ b/tests/unit/test_merger_repo.py @@ -163,6 +163,64 @@ class TestMergerRepo(ZuulTestCase): work_repo.reset() work_repo.checkout("foobar") + def test_rebase_merge_conflict_abort(self): + """Test that a failed rebase is properly aborted and related + directories are cleaned up.""" + parent_path = os.path.join(self.upstream_root, 'org/project1') + parent_repo = git.Repo(parent_path) + parent_repo.create_head("feature") + + files = {"test.txt": "master"} + self.create_commit("org/project1", files=files, head="master", + message="Add master file") + + files = {"test.txt": "feature"} + self.create_commit("org/project1", files=files, head="feature", + message="Add feature file") + + work_repo = Repo(parent_path, self.workspace_root, + "none@example.org", "User Name", "0", "0") + + item = {"ref": "refs/heads/feature"} + # We expect the rebase to fail because of a conflict, but the + # rebase will be aborted. + with testtools.ExpectedException(git.exc.GitCommandError): + work_repo.rebaseMerge(item, "master") + + # Assert that the failed rebase doesn't leave any temporary + # directories behind. + self.assertFalse( + os.path.exists(f"{work_repo.local_path}/.git/rebase-merge")) + self.assertFalse( + os.path.exists(f"{work_repo.local_path}/.git/rebase-apply")) + + def test_rebase_merge_conflict_reset_cleanup(self): + """Test temporary directories of a failed rebase merge are + removed on repo reset.""" + parent_path = os.path.join(self.upstream_root, 'org/project1') + parent_repo = git.Repo(parent_path) + parent_repo.create_head("feature") + + files = {"master.txt": "master"} + self.create_commit("org/project1", files=files, head="master", + message="Add master file") + + files = {"feature.txt": "feature"} + self.create_commit("org/project1", files=files, head="feature", + message="Add feature file") + + work_repo = Repo(parent_path, self.workspace_root, + "none@example.org", "User Name", "0", "0") + + # Simulate leftovers from a failed rebase + os.mkdir(f"{work_repo.local_path}/.git/rebase-merge") + os.mkdir(f"{work_repo.local_path}/.git/rebase-apply") + + # Resetting the repo should clean up any leaked directories + work_repo.reset() + item = {"ref": "refs/heads/feature"} + work_repo.rebaseMerge(item, "master") + def test_set_refs(self): parent_path = os.path.join(self.upstream_root, 'org/project1') remote_sha = self.create_commit('org/project1') diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index a5a49bed4..c6cdee7ea 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase): result='SUCCESS', changes='1,1'), ], ordered=False) + @model_version(12) + def test_model_12_13(self): + # Initially queue items will still have the full trigger event + # stored in Zookeeper. The trigger event will be converted to + # an event info object after the model API update. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + + # Upgrade our component + self.model_test_component_info.model_api = 13 + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + class TestGithubModelUpgrade(ZuulTestCase): config_file = 'zuul-github-driver.conf' diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py index 2cf93cdcb..0c5c5fbc9 100644 --- a/tests/unit/test_reporting.py +++ b/tests/unit/test_reporting.py @@ -151,7 +151,7 @@ class TestReporting(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() for x in buildsets: @@ -180,7 +180,7 @@ class TestReporting(ZuulTestCase): engine.connect() as conn: result = conn.execute( - sa.sql.select([reporter.connection.zuul_buildset_table])) + sa.sql.select(reporter.connection.zuul_buildset_table)) buildsets = result.fetchall() for x in buildsets: diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 35e632d46..172ed34dc 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -54,7 +54,7 @@ from tests.base import ( from zuul.zk.change_cache import ChangeKey from zuul.zk.event_queues import PIPELINE_NAME_ROOT from zuul.zk.layout import LayoutState -from zuul.zk.locks import management_queue_lock +from zuul.zk.locks import management_queue_lock, pipeline_lock from zuul.zk import zkobject EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) @@ -461,6 +461,7 @@ class TestScheduler(ZuulTestCase): 'zuul.mergers.online', value='1', kind='g') self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit', value='0', kind='g') + self.assertReportedStat('zuul.scheduler.run_handler', kind='ms') # Catch time / monotonic errors for key in [ @@ -490,9 +491,10 @@ class TestScheduler(ZuulTestCase): 'zuul.tenant.tenant-one.pipeline.gate.write_objects', 'zuul.tenant.tenant-one.pipeline.gate.read_znodes', 'zuul.tenant.tenant-one.pipeline.gate.write_znodes', - 'zuul.tenant.tenant-one.pipeline.gate.read_bytes', 'zuul.tenant.tenant-one.pipeline.gate.write_bytes', ]: + # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is + # expected to be zero since it's initialized after reading val = self.assertReportedStat(key, kind='g') self.assertTrue(0.0 < float(val) < 60000.0) @@ -3587,8 +3589,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3610,8 +3615,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3633,8 +3641,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3655,8 +3666,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3678,8 +3692,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - with self.createZKContext() as ctx,\ - gate.manager.currentContext(ctx): + with pipeline_lock( + self.zk_client, tenant.name, + gate.name) as lock,\ + self.createZKContext(lock) as ctx,\ + gate.manager.currentContext(ctx): gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) q1 = gate.getQueue(project1.canonical_name, None) @@ -3943,6 +3960,10 @@ class TestScheduler(ZuulTestCase): else: time.sleep(0) + self.assertGreater(new.last_reconfigured, old.last_reconfigured) + self.assertGreater(new.last_reconfigure_event_ltime, + old.last_reconfigure_event_ltime) + def test_tenant_reconfiguration_command_socket(self): "Test that single-tenant reconfiguration via command socket works" @@ -6200,7 +6221,8 @@ For CI problems and help debugging, contact ci@example.org""" build = self.getBuildByName('check-job') inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml') - inventory = yaml.safe_load(open(inv_path, 'r')) + with open(inv_path, 'r') as f: + inventory = yaml.safe_load(f) label = inventory['all']['hosts']['controller']['nodepool']['label'] self.assertEqual('slow-label', label) diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index de8b8f3ad..004ede862 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -1557,6 +1557,43 @@ class TestInRepoConfig(ZuulTestCase): 'start_line': 5}, }) + def test_dynamic_config_job_anchors(self): + # Test the use of anchors in job configuration. This is a + # regression test designed to catch a failure where we freeze + # the first job and in doing so, mutate the vars dict. The + # intended behavior is that the two jobs end up with two + # separate python objects for their vars dicts. + in_repo_conf = textwrap.dedent( + """ + - job: + name: myvars + vars: &anchor + plugins: + foo: bar + + - job: + name: project-test1 + timeout: 999999999999 + vars: *anchor + + - project: + name: org/project + check: + jobs: + - project-test1 + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertEqual(A.reported, 1, + "A should report failure") + self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1") + self.assertIn('max-job-timeout', A.messages[0]) + self.assertHistory([]) + def test_dynamic_config_non_existing_job_in_template(self): """Test that requesting a non existent job fails""" in_repo_conf = textwrap.dedent( diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 7e3c19dfe..b5697ee36 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -18,6 +18,7 @@ import json import queue import threading import uuid +from unittest import mock import testtools @@ -53,10 +54,12 @@ from tests.base import ( BaseTestCase, HoldableExecutorApi, HoldableMergerApi, iterate_timeout ) -from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext +from zuul.zk.zkobject import ( + ShardedZKObject, ZKObject, ZKContext +) from zuul.zk.locks import tenant_write_lock -from kazoo.exceptions import ZookeeperError, OperationTimeoutError +from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError class ZooKeeperBaseTestCase(BaseTestCase): @@ -2037,3 +2040,80 @@ class TestBlobStore(ZooKeeperBaseTestCase): with testtools.ExpectedException(KeyError): bs.get(path) + + +class TestPipelineInit(ZooKeeperBaseTestCase): + # Test the initialize-on-refresh code paths of various pipeline objects + + def test_pipeline_state_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.state.refresh(context) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_state_existing_object(self): + # Test the initialize-on-refresh code path with a pre-existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.manager = mock.Mock() + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + # We refresh the change list here purely for the side effect + # of creating the pipeline state object with no data (the list + # is a subpath of the state object). + pipeline.change_list.refresh(context) + pipeline.state.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_change_list_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.change_list.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + pipeline.manager = mock.Mock() + pipeline.state.refresh(context) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + def test_pipeline_change_list_new_object_without_lock(self): + # Test the initialize-on-refresh code path if we don't have + # the lock. This should fail. + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + with testtools.ExpectedException(NoNodeError): + pipeline.change_list.refresh(context, allow_init=False) + self.assertIsNone( + self.zk_client.client.exists(pipeline.change_list.getPath())) + pipeline.manager = mock.Mock() + pipeline.state.refresh(context) + self.assertEqual(pipeline.state.layout_uuid, layout.uuid) diff --git a/tools/docker-compose.yaml b/tools/docker-compose.yaml index 83ab9f930..05b4905e2 100644 --- a/tools/docker-compose.yaml +++ b/tools/docker-compose.yaml @@ -3,7 +3,7 @@ version: "3" services: mysql: container_name: zuul-test-mysql - image: mysql:5.7 + image: mysql:8.0 environment: - MYSQL_ROOT_PASSWORD=insecure_worker ports: diff --git a/tools/test-setup-docker.sh b/tools/test-setup-docker.sh index a0fcf9f5a..1601b11a7 100755 --- a/tools/test-setup-docker.sh +++ b/tools/test-setup-docker.sh @@ -58,7 +58,8 @@ timeout 30 bash -c "until ${ROOTCMD} ${MYSQL} -e 'show databases'; do sleep 0.5; echo echo "Setting up permissions for zuul tests" -${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' identified by 'openstack_citest' WITH GRANT OPTION;" +${ROOTCMD} ${MYSQL} -e "CREATE USER 'openstack_citest'@'%' identified by 'openstack_citest';" +${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' WITH GRANT OPTION;" ${ROOTCMD} ${MYSQL} -u openstack_citest -popenstack_citest -e "SET default_storage_engine=MYISAM; DROP DATABASE IF EXISTS openstack_citest; CREATE DATABASE openstack_citest CHARACTER SET utf8;" echo "Finished" diff --git a/web/public/openapi.yaml b/web/public/openapi.yaml index d69111cf8..cb2e10b37 100644 --- a/web/public/openapi.yaml +++ b/web/public/openapi.yaml @@ -249,7 +249,7 @@ paths: - tenant /api/tenant/{tenant}/key/{project}.pub: get: - operationId: get-project-key + operationId: get-project-secrets-key parameters: - description: The tenant name in: path @@ -275,12 +275,44 @@ paths: ' schema: - description: The project public key + description: The project secrets public key in PKCS8 format type: string - description: Returns the project public key + description: Returns the project public key that is used to encrypt secrets '404': description: Tenant or Project not found - summary: Get a project public key + summary: Get a project public key that is used to encrypt secrets + tags: + - tenant + /api/tenant/{tenant}/project-ssh-key/{project}.pub: + get: + operationId: get-project-ssh-key + parameters: + - description: The tenant name + in: path + name: tenant + required: true + schema: + type: string + - description: The project name + in: path + name: project + required: true + schema: + type: string + responses: + '200': + content: + text/plain: + example: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACA + + ' + schema: + description: The project ssh public key in SSH2 format + type: string + description: Returns the project public key that executor adds to SSH agent + '404': + description: Tenant or Project not found + summary: Get a project public key that is used for SSH in post-merge pipelines tags: - tenant /api/tenant/{tenant}/semaphores: diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py index 66881336a..2d7c37463 100644 --- a/zuul/ansible/logconfig.py +++ b/zuul/ansible/logconfig.py @@ -140,7 +140,8 @@ def _read_config_file(filename: str): raise ValueError("Unable to read logging config file at %s" % filename) if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'): - return yaml.safe_load(open(filename, 'r')) + with open(filename, 'r') as f: + return yaml.safe_load(f) return filename diff --git a/zuul/configloader.py b/zuul/configloader.py index 16bff1401..fe22fe0f8 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -437,6 +437,30 @@ def ansible_vars_dict(value): ansible_var_name(key) +def copy_safe_config(conf): + """Return a deep copy of a config dictionary. + + This lets us assign values of a config dictionary to configuration + objects, even if those values are nested dictionaries. This way + we can safely freeze the configuration object (the process of + which mutates dictionaries) without mutating the original + configuration. + + Meanwhile, this does retain the original context information as a + single object (some behaviors rely on mutating the source context + (e.g., pragma)). + + """ + ret = copy.deepcopy(conf) + for key in ( + '_source_context', + '_start_mark', + ): + if key in conf: + ret[key] = conf[key] + return ret + + class PragmaParser(object): pragma = { 'implied-branch-matchers': bool, @@ -452,6 +476,7 @@ class PragmaParser(object): self.pcontext = pcontext def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) bm = conf.get('implied-branch-matchers') @@ -512,6 +537,7 @@ class NodeSetParser(object): return vs.Schema(nodeset) def fromYaml(self, conf, anonymous=False): + conf = copy_safe_config(conf) if anonymous: self.anon_schema(conf) self.anonymous = True @@ -599,6 +625,7 @@ class SecretParser(object): return vs.Schema(secret) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) s = model.Secret(conf['name'], conf['_source_context']) s.source_context = conf['_source_context'] @@ -723,6 +750,7 @@ class JobParser(object): def fromYaml(self, conf, project_pipeline=False, name=None, validate=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) @@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object): return vs.Schema(project) def fromYaml(self, conf, validate=True, freeze=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) source_context = conf['_source_context'] @@ -1165,6 +1194,7 @@ class ProjectParser(object): return vs.Schema(project) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) project_name = conf.get('name') @@ -1328,6 +1358,7 @@ class PipelineParser(object): return vs.Schema(pipeline) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) pipeline = model.Pipeline(conf['name'], self.pcontext.tenant) pipeline.source_context = conf['_source_context'] @@ -1469,6 +1500,7 @@ class SemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1)) semaphore.source_context = conf.get('_source_context') @@ -1494,6 +1526,7 @@ class QueueParser: return vs.Schema(queue) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) queue = model.Queue( conf['name'], @@ -1523,6 +1556,7 @@ class AuthorizationRuleParser(object): return vs.Schema(authRule) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) a = model.AuthZRuleTree(conf['name']) @@ -1556,6 +1590,7 @@ class GlobalSemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1), global_scope=True) @@ -1576,6 +1611,7 @@ class ApiRootParser(object): return vs.Schema(api_root) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) api_root = model.ApiRoot(conf.get('authentication-realm')) api_root.access_rules = conf.get('access-rules', []) @@ -1697,7 +1733,7 @@ class TenantParser(object): 'disallowed-labels': to_list(str), 'allow-circular-dependencies': bool, 'default-parent': str, - 'default-ansible-version': vs.Any(str, float), + 'default-ansible-version': vs.Any(str, float, int), 'access-rules': to_list(str), 'admin-rules': to_list(str), 'semaphores': to_list(str), @@ -1782,8 +1818,8 @@ class TenantParser(object): # Set default ansible version default_ansible_version = conf.get('default-ansible-version') if default_ansible_version is not None: - # The ansible version can be interpreted as float by yaml so make - # sure it's a string. + # The ansible version can be interpreted as float or int + # by yaml so make sure it's a string. default_ansible_version = str(default_ansible_version) ansible_manager.requestVersion(default_ansible_version) else: @@ -2223,6 +2259,12 @@ class TenantParser(object): job.source_context.branch) with self.unparsed_config_cache.writeLock( job.source_context.project_canonical_name): + # Prevent files cache ltime from going backward + if files_cache.ltime >= job.ltime: + self.log.info( + "Discarding job %s result since the files cache was " + "updated in the meantime", job) + continue # Since the cat job returns all required config files # for ALL tenants the project is a part of, we can # clear the whole cache and then populate it with the diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 0a1f0ee61..276365e1d 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): def getInfoRefs(self, project: Project) -> Dict[str, str]: try: - data = self._uploadPack(project) + # Encode the UTF-8 data back to a byte array, as the size of + # each record in the pack is in bytes, and so the slicing must + # also be done on a byte-basis. + data = self._uploadPack(project).encode("utf-8") except Exception: self.log.error("Cannot get references from %s" % project) raise # keeps error information @@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): plen -= 4 if len(data) - i < plen: raise Exception("Invalid data in info/refs") - line = data[i:i + plen] + # Once the pack data is sliced, we can safely decode it back + # into a (UTF-8) string. + line = data[i:i + plen].decode("utf-8") i += plen if not read_advertisement: read_advertisement = True diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 631d98998..182c83bae 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -1461,6 +1461,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): log.debug("Change %s is currently being updated, " "waiting for it to finish", change) with lock: + change = self._change_cache.get(change_key) log.debug('Finished updating change %s', change) return change diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 40cac241a..1515db8df 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -607,17 +607,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return change project = self.source.getProject(change_key.project_name) if not change: - if not event: - self.log.error("Change %s not found in cache and no event", - change_key) - if event: - url = event.change_url change = MergeRequest(project.name) change.project = project change.number = number # patch_number is the tips commit SHA of the MR change.patchset = change_key.revision - change.url = url or self.getMRUrl(project.name, number) + change.url = self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme log.debug("Getting change mr#%s from project %s" % ( diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py index da7b3207f..17b67805e 100644 --- a/zuul/driver/sql/alembic/env.py +++ b/zuul/driver/sql/alembic/env.py @@ -53,7 +53,8 @@ def run_migrations_online(): connectable = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', - poolclass=pool.NullPool) + poolclass=pool.NullPool, + future=True) # we can get the table prefix via the tag object tag = context.get_tag_argument() diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py index 67581a6f9..1735d35f3 100644 --- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py +++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py @@ -24,13 +24,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET result=( - SELECT CASE score - WHEN 1 THEN 'SUCCESS' - ELSE 'FAILURE' END) - """.format(buildset_table=table_prefix + BUILDSET_TABLE)) + sa.text( + """ + UPDATE {buildset_table} + SET result=( + SELECT CASE score + WHEN 1 THEN 'SUCCESS' + ELSE 'FAILURE' END) + """.format(buildset_table=table_prefix + BUILDSET_TABLE) + ) + ) op.drop_column(table_prefix + BUILDSET_TABLE, 'score') diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py index abfba7247..99d12d750 100644 --- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py +++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py @@ -34,13 +34,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET updated=greatest( - coalesce(first_build_start_time, '1970-01-01 00:00:00'), - coalesce(last_build_end_time, '1970-01-01 00:00:00'), - coalesce(event_timestamp, '1970-01-01 00:00:00')) - """.format(buildset_table=table_prefix + "zuul_buildset")) + sa.text( + """ + UPDATE {buildset_table} + SET updated=greatest( + coalesce(first_build_start_time, '1970-01-01 00:00:00'), + coalesce(last_build_end_time, '1970-01-01 00:00:00'), + coalesce(event_timestamp, '1970-01-01 00:00:00')) + """.format(buildset_table=table_prefix + "zuul_buildset") + ) + ) def downgrade(): diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index b89653bba..2d5c39ec3 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -308,27 +308,31 @@ class SQLConnection(BaseConnection): def _migrate(self, revision='head'): """Perform the alembic migrations for this connection""" + # Note that this method needs to be called with an external lock held. + # The reason for this is we retrieve the alembic version and run the + # alembic migrations in different database transactions which opens + # us to races without an external lock. with self.engine.begin() as conn: context = alembic.migration.MigrationContext.configure(conn) current_rev = context.get_current_revision() - self.log.debug('Current migration revision: %s' % current_rev) - - config = alembic.config.Config() - config.set_main_option("script_location", - "zuul:driver/sql/alembic") - config.set_main_option("sqlalchemy.url", - self.connection_config.get('dburi'). - replace('%', '%%')) - - # Alembic lets us add arbitrary data in the tag argument. We can - # leverage that to tell the upgrade scripts about the table prefix. - tag = {'table_prefix': self.table_prefix} - - if current_rev is None and not self.force_migrations: - self.metadata.create_all(self.engine) - alembic.command.stamp(config, revision, tag=tag) - else: - alembic.command.upgrade(config, revision, tag=tag) + self.log.debug('Current migration revision: %s' % current_rev) + + config = alembic.config.Config() + config.set_main_option("script_location", + "zuul:driver/sql/alembic") + config.set_main_option("sqlalchemy.url", + self.connection_config.get('dburi'). + replace('%', '%%')) + + # Alembic lets us add arbitrary data in the tag argument. We can + # leverage that to tell the upgrade scripts about the table prefix. + tag = {'table_prefix': self.table_prefix} + + if current_rev is None and not self.force_migrations: + self.metadata.create_all(self.engine) + alembic.command.stamp(config, revision, tag=tag) + else: + alembic.command.upgrade(config, revision, tag=tag) def onLoad(self, zk_client, component_registry=None): safe_connection = quote_plus(self.connection_name) diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py index ecefae9ea..63a800406 100644 --- a/zuul/lib/repl.py +++ b/zuul/lib/repl.py @@ -26,14 +26,14 @@ class ThreadLocalProxy(object): self.default = default def __getattr__(self, name): - obj = self.files.get(threading.currentThread(), self.default) + obj = self.files.get(threading.current_thread(), self.default) return getattr(obj, name) def register(self, obj): - self.files[threading.currentThread()] = obj + self.files[threading.current_thread()] = obj def unregister(self): - self.files.pop(threading.currentThread()) + self.files.pop(threading.current_thread()) class REPLHandler(socketserver.StreamRequestHandler): diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index bf49737dd..e87e553d3 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,6 +28,8 @@ from zuul.model import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY +from zuul.zk.exceptions import LockException +from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout self.buildChangeQueues(layout) - with self.sched.createZKContext(None, self.log) as ctx,\ - self.currentContext(ctx): - # Make sure we have state and change list objects, and - # ensure that they exist in ZK. We don't hold the - # pipeline lock, but if they don't exist, that means they - # are new, so no one else will either, so the write on - # create is okay. If they do exist and we have an old - # object, we'll just reuse it. If it does exist and we - # don't have an old object, we'll get a new empty one. - # Regardless, these will not automatically refresh now, so - # they will be out of date until they are refreshed later. - self.pipeline.state = PipelineState.create( - self.pipeline, layout.uuid, self.pipeline.state) - self.pipeline.change_list = PipelineChangeList.create( - self.pipeline) + # Make sure we have state and change list objects. We + # don't actually ensure they exist in ZK here; these are + # just local objects until they are serialized the first + # time. Since we don't hold the pipeline lock, we can't + # reliably perform any read or write operations; we just + # need to ensure we have in-memory objects to work with + # and they will be initialized or loaded on the next + # refresh. + + # These will be out of date until they are refreshed later. + self.pipeline.state = PipelineState.create( + self.pipeline, self.pipeline.state) + self.pipeline.change_list = PipelineChangeList.create( + self.pipeline) + + # Now, try to acquire a non-blocking pipeline lock and refresh + # them for the side effect of initializing them if necessary. + # In the case of a new pipeline, no one else should have a + # lock anyway, and this helps us avoid emitting a whole bunch + # of errors elsewhere on startup when these objects don't + # exist. If the pipeline already exists and we can't acquire + # the lock, that's fine, we're much less likely to encounter + # read errors elsewhere in that case anyway. + try: + with pipeline_lock( + self.sched.zk_client, self.pipeline.tenant.name, + self.pipeline.name, blocking=False) as lock,\ + self.sched.createZKContext(lock, self.log) as ctx,\ + self.currentContext(ctx): + if not self.pipeline.state.exists(ctx): + # We only do this if the pipeline doesn't exist in + # ZK because in that case, this process should be + # fast since it's empty. If it does exist, + # refreshing it may be slow and since other actors + # won't encounter errors due to its absence, we + # would rather defer the work to later. + self.pipeline.state.refresh(ctx) + self.pipeline.change_list.refresh(ctx) + except LockException: + pass def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") @@ -610,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta): if enqueue_time: item.enqueue_time = enqueue_time item.live = live - self.reportStats(item, added=True) + self.reportStats(item, trigger_event=event) item.quiet = quiet if item.live: @@ -2204,7 +2231,7 @@ class PipelineManager(metaclass=ABCMeta): log.error("Reporting item %s received: %s", item, ret) return action, (not ret) - def reportStats(self, item, added=False): + def reportStats(self, item, trigger_event=None): if not self.sched.statsd: return try: @@ -2243,18 +2270,21 @@ class PipelineManager(metaclass=ABCMeta): if dt: self.sched.statsd.timing(key + '.resident_time', dt) self.sched.statsd.incr(key + '.total_changes') - if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): + if ( + trigger_event + and hasattr(trigger_event, 'arrived_at_scheduler_timestamp') + ): now = time.time() - arrived = item.event.arrived_at_scheduler_timestamp + arrived = trigger_event.arrived_at_scheduler_timestamp processing = (now - arrived) * 1000 - elapsed = (now - item.event.timestamp) * 1000 + elapsed = (now - trigger_event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) self.reportPipelineTiming('event_enqueue_time', - item.event.timestamp) + trigger_event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 1e2de27b8..e4688a1b7 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -333,6 +333,26 @@ class Repo(object): os.rmdir(root) @staticmethod + def _cleanup_leaked_rebase_dirs(local_path, log, messages): + for rebase_dir in [".git/rebase-merge", ".git/rebase-apply"]: + leaked_dir = os.path.join(local_path, rebase_dir) + if not os.path.exists(leaked_dir): + continue + if log: + log.debug("Cleaning leaked %s dir", leaked_dir) + else: + messages.append( + f"Cleaning leaked {leaked_dir} dir") + try: + shutil.rmtree(leaked_dir) + except Exception as exc: + msg = f"Failed to remove leaked {leaked_dir} dir:" + if log: + log.exception(msg) + else: + messages.append(f"{msg}\n{exc}") + + @staticmethod def refNameToZuulRef(ref_name: str) -> str: return "refs/zuul/{}".format( hashlib.sha1(ref_name.encode("utf-8")).hexdigest() @@ -384,6 +404,8 @@ class Repo(object): messages.append("Delete stale Zuul ref {}".format(ref)) Repo._deleteRef(ref.path, repo) + Repo._cleanup_leaked_rebase_dirs(local_path, log, messages) + # Note: Before git 2.13 deleting a a ref foo/bar leaves an empty # directory foo behind that will block creating the reference foo # in the future. As a workaround we must clean up empty directories @@ -615,7 +637,11 @@ class Repo(object): self.fetch(ref, zuul_event_id=zuul_event_id) log.debug("Rebasing %s with args %s", ref, args) repo.git.checkout('FETCH_HEAD') - repo.git.rebase(*args) + try: + repo.git.rebase(*args) + except Exception: + repo.git.rebase(abort=True) + raise return repo.head.commit def fetch(self, ref, zuul_event_id=None): diff --git a/zuul/model.py b/zuul/model.py index 8e0ae4eee..e526b749c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject): _read_only=False, ) + def _lateInitData(self): + # If we're initializing the object on our initial refresh, + # reset the data to this. + return dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=[], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + @classmethod def fromZK(klass, context, path, pipeline, **kw): obj = klass() @@ -631,21 +643,23 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def create(cls, pipeline, layout_uuid, old_state=None): - # If the object does not exist in ZK, create it with the - # default attributes and the supplied layout UUID. Otherwise, - # return an initialized object (or the old object for reuse) - # without loading any data so that data can be loaded on the - # next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline, old_state=None): + # If we are resetting an existing pipeline, we will have an + # old_state, so just clean up the object references there and + # let the next refresh handle updating any data. + if old_state: + old_state._resetObjectRefs() + return old_state + + # Otherwise, we are initializing a pipeline that we haven't + # seen before. It still might exist in ZK, but since we + # haven't seen it, we don't have any object references to + # clean up. We can just start with a clean object, set the + # pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. state = cls() state._set(pipeline=pipeline) - if state.exists(ctx): - if old_state: - old_state._resetObjectRefs() - return old_state - return state - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) + return state def _resetObjectRefs(self): # Update the pipeline references on the queue objects. @@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject): # This is so that we can refresh the object in circumstances # where we haven't verified that our local layout matches # what's in ZK. + + # Notably, this need not prevent us from performing the + # initialization below if necessary. The case of the object + # being brand new in ZK supercedes our worry that our old copy + # might be out of date since our old copy is, itself, brand + # new. self._set(_read_only=read_only) - return super().refresh(context) + try: + return super().refresh(context) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and we + # should write it to ZK. + + # Note that typically this code is not used since + # currently other objects end up creating the pipeline + # path in ZK first. It is included in case that ever + # changes. Currently the empty byte-string code path in + # deserialize() is used instead. + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self._set(**self._lateInitData()) + self.internalCreate(context) def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so @@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject): # source change cache. self.pipeline.manager.clearCache() + # If the object doesn't exist we will get back an empty byte + # string. This happens because the postConfig call creates + # this object without holding the pipeline lock, so it can't + # determine whether or not it exists in ZK. We do hold the + # pipeline lock here, so if we get the empty byte string, we + # know we're initializing the object. In that case, we should + # initialize the layout id to the current layout. Nothing + # else needs to be set. + if raw == b'': + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + return self._lateInitData() + data = super().deserialize(raw, context) if not self._read_only: @@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject): _change_keys=[], ) - def refresh(self, context): - self._retry(context, super().refresh, - context, max_tries=5) + def refresh(self, context, allow_init=True): + # Set allow_init to false to indicate that we don't hold the + # lock and we should not try to initialize the object in ZK if + # it does not exist. + try: + self._retry(context, super().refresh, + context, max_tries=5) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and + # we should write it to ZK. + if allow_init: + context.log.warning( + "Initializing pipeline change list for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self.internalCreate(context) + else: + # If we're called from a context where we can't + # initialize the change list, re-raise the exception. + raise def getPath(self): return self.getChangeListPath(self.pipeline) @@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject): return pipeline_path + '/change_list' @classmethod - def create(cls, pipeline, old_list=None): - # If the object does not exist in ZK, create it with the - # default attributes. Otherwise, return an initialized object - # (or the old object for reuse) without loading any data so - # that data can be loaded on the next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline): + # This object may or may not exist in ZK, but we using any of + # that data here. We can just start with a clean object, set + # the pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. change_list = cls() change_list._set(pipeline=pipeline) - if change_list.exists(ctx): - if old_list: - return old_list - return change_list - return cls.new(ctx, pipeline=pipeline) + return change_list def serialize(self, context): data = { @@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") - def deserialize(self, data, context): - data = super().deserialize(data, context) + def deserialize(self, raw, context): + data = super().deserialize(raw, context) change_keys = [] # We must have a dictionary with a 'changes' key; otherwise we # may be reading immediately after truncating. Allow the @@ -4378,9 +4449,8 @@ class BuildSet(zkobject.ZKObject): version = build.getZKVersion() # If zstat is None, we created the object if version is not None: - versions = self.build_versions.copy() - versions[build.uuid] = version + 1 - self.updateAttributes(context, build_versions=versions) + self.build_versions[build.uuid] = version + 1 + self.updateAttributes(context, build_versions=self.build_versions) def updateJobVersion(self, context, job): if (COMPONENT_REGISTRY.model_api < 12): @@ -4388,9 +4458,8 @@ class BuildSet(zkobject.ZKObject): version = job.getZKVersion() if version is not None: - versions = self.job_versions.copy() - versions[job.name] = version + 1 - self.updateAttributes(context, job_versions=versions) + self.job_versions[job.name] = version + 1 + self.updateAttributes(context, job_versions=self.job_versions) def shouldRefreshBuild(self, build): # Unless all schedulers are updating versions, we can't trust @@ -4398,19 +4467,15 @@ class BuildSet(zkobject.ZKObject): if (COMPONENT_REGISTRY.model_api < 12): return True current = build.getZKVersion() - if current is None: - current = -1 expected = self.build_versions.get(build.uuid, 0) - return expected > current + return expected != current def shouldRefreshJob(self, job): if (COMPONENT_REGISTRY.model_api < 12): return True current = job.getZKVersion() - if current is None: - current = -1 expected = self.job_versions.get(job.name, 0) - return expected > current + return expected != current @property def ref(self): @@ -4601,6 +4666,37 @@ class BuildSet(zkobject.ZKObject): return Attributes(uuid=self.uuid) +class EventInfo: + + def __init__(self): + self.zuul_event_id = None + self.timestamp = time.time() + self.span_context = None + + @classmethod + def fromEvent(cls, event): + tinfo = cls() + tinfo.zuul_event_id = event.zuul_event_id + tinfo.timestamp = event.timestamp + tinfo.span_context = event.span_context + return tinfo + + @classmethod + def fromDict(cls, d): + tinfo = cls() + tinfo.zuul_event_id = d["zuul_event_id"] + tinfo.timestamp = d["timestamp"] + tinfo.span_context = d["span_context"] + return tinfo + + def toDict(self): + return { + "zuul_event_id": self.zuul_event_id, + "timestamp": self.timestamp, + "span_context": self.span_context, + } + + class QueueItem(zkobject.ZKObject): """Represents the position of a Change in a ChangeQueue. @@ -4635,7 +4731,7 @@ class QueueItem(zkobject.ZKObject): live=True, # Whether an item is intended to be processed at all layout_uuid=None, _cached_sql_results={}, - event=None, # The trigger event that lead to this queue item + event=None, # Info about the event that lead to this queue item # Additional container for connection specifig information to be # used by reporters throughout the lifecycle @@ -4657,6 +4753,9 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) + if COMPONENT_REGISTRY.model_api >= 13: + obj._set(event=obj.event and EventInfo.fromEvent(obj.event)) + data = obj._trySerialize(context) obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None @@ -4685,10 +4784,18 @@ class QueueItem(zkobject.ZKObject): return (tenant, pipeline, uuid) def serialize(self, context): - if isinstance(self.event, TriggerEvent): - event_type = "TriggerEvent" + if COMPONENT_REGISTRY.model_api < 13: + if isinstance(self.event, TriggerEvent): + event_type = "TriggerEvent" + else: + event_type = self.event.__class__.__name__ else: - event_type = self.event.__class__.__name__ + event_type = "EventInfo" + if not isinstance(self.event, EventInfo): + # Convert our local trigger event to a trigger info + # object. This will only happen on the transition to + # model API version 13. + self._set(event=EventInfo.fromEvent(self.event)) data = { "uuid": self.uuid, @@ -4730,14 +4837,18 @@ class QueueItem(zkobject.ZKObject): # child objects. self._set(uuid=data["uuid"]) - event_type = data["event"]["type"] - if event_type == "TriggerEvent": - event_class = ( - self.pipeline.manager.sched.connections.getTriggerEventClass( - data["event"]["data"]["driver_name"]) - ) + if COMPONENT_REGISTRY.model_api < 13: + event_type = data["event"]["type"] + if event_type == "TriggerEvent": + event_class = ( + self.pipeline.manager.sched.connections + .getTriggerEventClass( + data["event"]["data"]["driver_name"]) + ) + else: + event_class = EventTypeIndex.event_type_mapping.get(event_type) else: - event_class = EventTypeIndex.event_type_mapping.get(event_type) + event_class = EventInfo if event_class is None: raise NotImplementedError( @@ -5959,8 +6070,7 @@ class Bundle: def deserialize(cls, context, queue, items_by_path, data): bundle = cls(data["uuid"]) bundle.items = [ - items_by_path.get(p) or QueueItem.fromZK( - context, p, pipeline=queue.pipeline, queue=queue) + items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue) for p in data["items"] ] bundle.started_reporting = data["started_reporting"] diff --git a/zuul/model_api.py b/zuul/model_api.py index ccb12077d..0244296dd 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 12 +MODEL_API = 13 diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 2b78b4c84..cd15a878c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1528,7 +1528,7 @@ class Scheduler(threading.Thread): with self.createZKContext(lock, self.log) as ctx: if tenant is not None: self._reconfigureTenant(ctx, min_ltimes, - -1, + event.zuul_event_ltime, tenant, old_tenant) else: self._reconfigureDeleteTenant(ctx, old_tenant) @@ -1548,7 +1548,6 @@ class Scheduler(threading.Thread): # This is called in the scheduler loop after another thread submits # a request if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.log.debug("Updating system config") self.updateSystemConfig() with self.layout_lock: @@ -2126,70 +2125,74 @@ class Scheduler(threading.Thread): return self.log.debug("Run handler awake") self.run_handler_lock.acquire() - try: - if not self._stopped: - self.process_reconfigure_queue() + with self.statsd_timer("zuul.scheduler.run_handler"): + try: + self._run() + except Exception: + self.log.exception("Exception in run handler:") + # There may still be more events to process + self.wake_event.set() + finally: + self.run_handler_lock.release() - if self.unparsed_abide.ltime < self.system_config_cache.ltime: - self.updateSystemConfig() + def _run(self): + if not self._stopped: + self.process_reconfigure_queue() - for tenant_name in self.unparsed_abide.tenants: - if self._stopped: - break + if self.unparsed_abide.ltime < self.system_config_cache.ltime: + self.updateSystemConfig() - tenant = self.abide.tenants.get(tenant_name) - if not tenant: - continue + for tenant_name in self.unparsed_abide.tenants: + if self._stopped: + break - # This will also forward events for the pipelines - # (e.g. enqueue or dequeue events) to the matching - # pipeline event queues that are processed afterwards. - self.process_tenant_management_queue(tenant) + tenant = self.abide.tenants.get(tenant_name) + if not tenant: + continue - if self._stopped: - break + # This will also forward events for the pipelines + # (e.g. enqueue or dequeue events) to the matching + # pipeline event queues that are processed afterwards. + self.process_tenant_management_queue(tenant) - try: - with tenant_read_lock( - self.zk_client, tenant_name, blocking=False - ) as tlock: - if not self.isTenantLayoutUpToDate(tenant_name): - continue + if self._stopped: + break - # Get tenant again, as it might have been updated - # by a tenant reconfig or layout change. - tenant = self.abide.tenants[tenant_name] + try: + with tenant_read_lock( + self.zk_client, tenant_name, blocking=False + ) as tlock: + if not self.isTenantLayoutUpToDate(tenant_name): + continue - if not self._stopped: - # This will forward trigger events to pipeline - # event queues that are processed below. - self.process_tenant_trigger_queue(tenant) + # Get tenant again, as it might have been updated + # by a tenant reconfig or layout change. + tenant = self.abide.tenants[tenant_name] - self.process_pipelines(tenant, tlock) - except LockException: - self.log.debug("Skipping locked tenant %s", - tenant.name) - remote_state = self.tenant_layout_state.get( - tenant_name) - local_state = self.local_layout_state.get( - tenant_name) - if (remote_state is None or - local_state is None or - remote_state > local_state): - # Let's keep looping until we've updated to the - # latest tenant layout. - self.wake_event.set() - except Exception: - self.log.exception("Exception processing tenant %s:", - tenant_name) - # There may still be more events to process - self.wake_event.set() + if not self._stopped: + # This will forward trigger events to pipeline + # event queues that are processed below. + self.process_tenant_trigger_queue(tenant) + + self.process_pipelines(tenant, tlock) + except LockException: + self.log.debug("Skipping locked tenant %s", + tenant.name) + remote_state = self.tenant_layout_state.get( + tenant_name) + local_state = self.local_layout_state.get( + tenant_name) + if (remote_state is None or + local_state is None or + remote_state > local_state): + # Let's keep looping until we've updated to the + # latest tenant layout. + self.wake_event.set() except Exception: - self.log.exception("Exception in run handler:") + self.log.exception("Exception processing tenant %s:", + tenant_name) # There may still be more events to process self.wake_event.set() - finally: - self.run_handler_lock.release() def primeSystemConfig(self): with self.layout_lock: @@ -2207,6 +2210,7 @@ class Scheduler(threading.Thread): def updateSystemConfig(self): with self.layout_lock: + self.log.debug("Updating system config") self.unparsed_abide, self.globals = self.system_config_cache.get() self.ansible_manager = AnsibleManager( default_version=self.globals.default_ansible_version) @@ -2241,7 +2245,12 @@ class Scheduler(threading.Thread): self.zk_client, tenant.name, pipeline.name, blocking=False) as lock,\ self.createZKContext(lock, self.log) as ctx: + self.log.debug("Processing pipeline %s in tenant %s", + pipeline.name, tenant.name) with pipeline.manager.currentContext(ctx): + if ((tenant.name, pipeline.name) in + self._profile_pipelines): + ctx.profile = True with self.statsd_timer(f'{stats_key}.handling'): refreshed = self._process_pipeline( tenant, pipeline) @@ -2310,14 +2319,10 @@ class Scheduler(threading.Thread): stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' ctx = pipeline.manager.current_context - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = True with self.statsd_timer(f'{stats_key}.refresh'): pipeline.change_list.refresh(ctx) pipeline.summary.refresh(ctx) pipeline.state.refresh(ctx) - if (tenant.name, pipeline.name) in self._profile_pipelines: - ctx.profile = False pipeline.state.setDirty(self.zk_client.client) if pipeline.state.old_queues: @@ -2354,7 +2359,9 @@ class Scheduler(threading.Thread): for pipeline in tenant.layout.pipelines.values(): self.log.debug("Gather relevant cache items for: %s %s", tenant.name, pipeline.name) - pipeline.change_list.refresh(ctx) + # This will raise an exception and abort the process if + # unable to refresh the change list. + pipeline.change_list.refresh(ctx, allow_init=False) change_keys = pipeline.change_list.getChangeKeys() relevant_changes = pipeline.manager.resolveChangeKeys( change_keys) @@ -2380,11 +2387,21 @@ class Scheduler(threading.Thread): with trigger_queue_lock( self.zk_client, tenant.name, blocking=False ): + self.log.debug("Processing tenant trigger events in %s", + tenant.name) # Update the pipeline changes ctx = self.createZKContext(None, self.log) for pipeline in tenant.layout.pipelines.values(): + # This will raise an exception if it is unable to + # refresh the change list. We will proceed anyway + # and use our data from the last time we did + # refresh in order to avoid stalling trigger + # processing. In this case we may not forward + # some events which are related to changes in the + # pipeline but don't match the pipeline trigger + # criteria. try: - pipeline.change_list.refresh(ctx) + pipeline.change_list.refresh(ctx, allow_init=False) except Exception: self.log.exception( "Unable to refresh pipeline change list for %s", @@ -2597,6 +2614,8 @@ class Scheduler(threading.Thread): "Skipping management event queue for tenant %s", tenant.name) return + self.log.debug("Processing tenant management events in %s", + tenant.name) self._process_tenant_management_queue(tenant) except LockException: self.log.debug("Skipping locked management event queue" diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index 01a683b23..c2487af88 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -97,7 +97,7 @@ class BaseSource(object, metaclass=abc.ABCMeta): # info on subsequent requests we can continue to do the # requested job work. try: - dep = self.getChangeByURL(url, event) + return self.getChangeByURL(url, event) except Exception: # Note that if the change isn't found dep is None. # We do not raise in that case and do not need to handle it @@ -109,7 +109,6 @@ class BaseSource(object, metaclass=abc.ABCMeta): time.sleep(1) else: raise - return dep @abc.abstractmethod def getChangesDependingOn(self, change, projects, tenant): diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index b228ecaa4..87d76bca6 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -233,7 +233,18 @@ class ZKObject: obj._load(context, path=path) return obj + def internalCreate(self, context): + """Create the object in ZK from an existing ZKObject + + This should only be used in special circumstances: when we + know it's safe to start using a ZKObject before it's actually + created in ZK. Normally use .new() + """ + data = self._trySerialize(context) + self._save(context, data, create=True) + def refresh(self, context): + """Update data from ZK""" self._load(context) |