diff options
37 files changed, 939 insertions, 130 deletions
diff --git a/doc/source/developer/index.rst b/doc/source/developer/index.rst index 52266a175..b45c75640 100644 --- a/doc/source/developer/index.rst +++ b/doc/source/developer/index.rst @@ -14,6 +14,7 @@ Zuul, though advanced users may find it interesting. drivers triggers testing + metrics docs ansible javascript diff --git a/doc/source/developer/metrics.rst b/doc/source/developer/metrics.rst new file mode 100644 index 000000000..913a591ba --- /dev/null +++ b/doc/source/developer/metrics.rst @@ -0,0 +1,74 @@ +:title: Metrics + +Metrics +======= + +Event Overview +-------------- + +The following table illustrates the event and pipeline processing +sequence as it relates to some of the metrics described in +:ref:`statsd`. This is intended as general guidance only and is not +an exhaustive list. + ++----------------------------------------+------+------+------+--------------------------------------+ +| Event | Metrics | Attribute | ++========================================+======+======+======+======================================+ +| Event generated by source | | | | event.timestamp | ++----------------------------------------+------+ + +--------------------------------------+ +| Enqueued into driver queue | | | | | ++----------------------------------------+------+ + +--------------------------------------+ +| Enqueued into tenant trigger queue | | | | event.arrived_at_scheduler_timestamp | ++----------------------------------------+ + [8] + +--------------------------------------+ +| Forwarded to matching pipelines | [1] | | | | ++----------------------------------------+ + + +--------------------------------------+ +| Changes enqueued ahead | | | | | ++----------------------------------------+ + + +--------------------------------------+ +| Change enqueued | | | | item.enqueue_time | ++----------------------------------------+------+------+ +--------------------------------------+ +| Changes enqueued behind | | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Set item configuration | | | | build_set.configured_time | ++----------------------------------------+------+------+ +--------------------------------------+ +| Request files changed (if needed) | | | | | ++----------------------------------------+ +------+ +--------------------------------------+ +| Request merge | [2] | | | | ++----------------------------------------+ +------+ +--------------------------------------+ +| Wait for merge (and files if needed) | | | [9] | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Generate dynamic layout (if needed) | [3] | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Freeze job graph | [4] | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Request global repo state (if needed) | | | | build_set.repo_state_request_time | ++----------------------------------------+ [5] +------+ +--------------------------------------+ +| Wait for global repo state (if needed) | | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Deduplicate jobs | | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Acquire semaphore (non-resources-first)| | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Request nodes | | | | request.created_time | ++----------------------------------------+ [6] +------+ +--------------------------------------+ +| Wait for nodes | | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Acquire semaphore (resources-first) | | | | | ++----------------------------------------+------+------+ +--------------------------------------+ +| Enqueue build request | | | | build.execute_time | ++----------------------------------------+ [7] +------+ +--------------------------------------+ +| Executor starts job | | | | build.start_time | ++----------------------------------------+------+------+------+--------------------------------------+ + +====== ============================= +Metric Name +====== ============================= +1 event_enqueue_processing_time +2 merge_request_time +3 layout_generation_time +4 job_freeze_time +5 repo_state_time +6 node_request_time +7 job_wait_time +8 event_enqueue_time +9 event_job_time +====== ============================= diff --git a/doc/source/drivers/timer.rst b/doc/source/drivers/timer.rst index ff50b10ba..1d7931c5e 100644 --- a/doc/source/drivers/timer.rst +++ b/doc/source/drivers/timer.rst @@ -14,9 +14,9 @@ Timers don't require a special connection or driver. Instead they can simply be used by listing ``timer`` as the trigger. This trigger will run based on a cron-style time specification. It -will enqueue an event into its pipeline for every project defined in -the configuration. Any job associated with the pipeline will run in -response to that event. +will enqueue an event into its pipeline for every project and branch +defined in the configuration. Any job associated with the pipeline +will run in response to that event. .. attr:: pipeline.trigger.timer @@ -27,9 +27,9 @@ response to that event. The time specification in cron syntax. Only the 5 part syntax is supported, not the symbolic names. Example: ``0 0 * * *`` - runs at midnight. The first weekday is Monday. - An optional 6th part specifies seconds. The optional 7th part - specifies a jitter in seconds. This advances or delays the - trigger randomly, limited by the specified value. - Example ``0 0 * * * * 60`` runs at midnight with a +/- 60 - seconds jitter. + runs at midnight. The first weekday is Monday. An optional 6th + part specifies seconds. The optional 7th part specifies a + jitter in seconds. This delays the trigger randomly, limited by + the specified value. Example ``0 0 * * * * 60`` runs at + midnight or randomly up to 60 seconds later. The jitter is + applied individually to each project-branch combination. diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 0c2cb4351..1cb61ee01 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -110,7 +110,27 @@ These metrics are emitted by the Zuul :ref:`scheduler`: operation(s). This will always include a request to a Zuul merger to speculatively merge the change, but it may also include a second request submitted in parallel to identify - the files altered by the change. + the files altered by the change. Includes + :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_merge_op_time` + and + :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_files_changes_op_time`. + + .. stat:: merger_merge_op_time + :type: timer + + The amount of time the merger spent performing a merge + operation. This does not include any of the round-trip time + from the scheduler to the merger, or any other merge + operations. + + .. stat:: merger_files_changes_op_time + :type: timer + + The amount of time the merger spent performing a files-changes + operation to detect changed files (this is sometimes + performed if the source does not provide this information). + This does not include any of the round-trip time from the + scheduler to the merger, or any other merge operations. .. stat:: layout_generation_time :type: timer @@ -128,7 +148,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The amount of time waiting for a secondary Zuul merger operation to collect additional information about the repo - state of required projects. + state of required projects. Includes + :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_repo_state_op_time`. + + .. stat:: merger_repo_state_op_time + :type: timer + + The amount of time the merger spent performing a repo state + operation to collect additional information about the repo + state of required projects. This does not include any of the + round-trip time from the scheduler to the merger, or any + other merge operations. .. stat:: node_request_time :type: timer diff --git a/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml b/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml new file mode 100644 index 000000000..d209f4a87 --- /dev/null +++ b/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Pipeline timer triggers with jitter now apply the jitter to each + project-branch individually (instead of to the pipeline as a + whole). This can reduce the thundering herd effect on external + systems for periodic pipelines with many similar jobs. diff --git a/tests/base.py b/tests/base.py index 5a85ea0d7..cebcf2e1f 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3091,6 +3091,8 @@ class FakeBuild(object): self.paused = False self.aborted = False self.requeue = False + self.should_fail = False + self.should_retry = False self.created = time.time() self.changes = None items = self.parameters['zuul']['items'] @@ -3162,6 +3164,8 @@ class FakeBuild(object): return result def shouldFail(self): + if self.should_fail: + return True changes = self.executor_server.fail_tests.get(self.name, []) for change in changes: if self.hasChanges(change): @@ -3169,6 +3173,8 @@ class FakeBuild(object): return False def shouldRetry(self): + if self.should_retry: + return True entries = self.executor_server.retry_tests.get(self.name, []) for entry in entries: if self.hasChanges(entry['change']): @@ -3662,7 +3668,7 @@ class FakeSMTP(object): class FakeNodepool(object): REQUEST_ROOT = '/nodepool/requests' NODE_ROOT = '/nodepool/nodes' - LAUNCHER_ROOT = '/nodepool/launchers' + COMPONENT_ROOT = '/nodepool/components' log = logging.getLogger("zuul.test.FakeNodepool") @@ -3726,10 +3732,11 @@ class FakeNodepool(object): self.fulfillRequest(req) def registerLauncher(self, labels=["label1"], id="FakeLauncher"): - path = os.path.join(self.LAUNCHER_ROOT, id) + path = os.path.join(self.COMPONENT_ROOT, 'pool', id) data = {'id': id, 'supported_labels': labels} self.client.create( - path, json.dumps(data).encode('utf8'), makepath=True) + path, json.dumps(data).encode('utf8'), + ephemeral=True, makepath=True, sequence=True) def getNodeRequests(self): try: diff --git a/tests/fixtures/layouts/job-dedup-false.yaml b/tests/fixtures/layouts/job-dedup-false.yaml index 2c0e6ee2e..9254f0b41 100644 --- a/tests/fixtures/layouts/job-dedup-false.yaml +++ b/tests/fixtures/layouts/job-dedup-false.yaml @@ -39,6 +39,7 @@ - job: name: common-job deduplicate: false + pre-run: playbooks/pre.yaml required-projects: - org/project1 - org/project2 diff --git a/tests/fixtures/layouts/job-dedup-noop.yaml b/tests/fixtures/layouts/job-dedup-noop.yaml new file mode 100644 index 000000000..9383fd8b6 --- /dev/null +++ b/tests/fixtures/layouts/job-dedup-noop.yaml @@ -0,0 +1,55 @@ +- queue: + name: integrated + allow-circular-dependencies: true + +- pipeline: + name: gate + manager: dependent + success-message: Build succeeded (gate). + require: + gerrit: + approval: + - Approved: 1 + trigger: + gerrit: + - event: comment-added + approval: + - Approved: 1 + success: + gerrit: + Verified: 2 + submit: true + failure: + gerrit: + Verified: -2 + start: + gerrit: + Verified: 0 + precedence: high + +- job: + name: base + parent: null + pre-run: playbooks/pre.yaml + run: playbooks/run.yaml + nodeset: + nodes: + - label: debian + name: controller + +- job: + name: common-job + required-projects: + - org/project1 + +- job: + name: project1-job + +- project: + name: org/project1 + queue: integrated + gate: + jobs: + - noop + - common-job + - project1-job diff --git a/tests/fixtures/layouts/trigger-sequence.yaml b/tests/fixtures/layouts/trigger-sequence.yaml new file mode 100644 index 000000000..31db734b1 --- /dev/null +++ b/tests/fixtures/layouts/trigger-sequence.yaml @@ -0,0 +1,75 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- pipeline: + name: gate + manager: dependent + success-message: Build succeeded (gate). + trigger: + gerrit: + - event: comment-added + approval: + - Approved: 1 + success: + gerrit: + Verified: 2 + submit: true + failure: + gerrit: + Verified: -2 + start: + gerrit: + Verified: 0 + precedence: high + +- pipeline: + name: post + manager: independent + trigger: + gerrit: + - event: ref-updated + ref: ^(?!refs/).*$ + +- pipeline: + name: tag + manager: independent + trigger: + gerrit: + - event: ref-updated + ref: ^refs/tags/.*$ + +- job: + name: base + parent: null + run: playbooks/base.yaml + nodeset: + nodes: + - label: ubuntu-xenial + name: controller + +- job: + name: check-job + run: playbooks/check.yaml + +- job: + name: post-job + run: playbooks/post.yaml + +- project: + name: org/project + check: + jobs: + - check-job + gate: + jobs: + - check-job diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index ac5ad13f5..2223008d5 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -1703,6 +1703,151 @@ class TestGerritCircularDependencies(ZuulTestCase): ], ordered=False) self.assertEqual(len(self.fake_nodepool.history), 3) + @simple_layout('layouts/job-dedup-false.yaml') + def test_job_deduplication_false_failed_job(self): + # Test that if we are *not* deduplicating jobs, we don't + # duplicate the result on two different builds. + # The way we check that is to retry the common-job between two + # items, but only once, and only on one item. The other item + # should be unaffected. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + + # A <-> B + A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + A.subject, B.data["url"] + ) + B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + B.subject, A.data["url"] + ) + + A.addApproval('Code-Review', 2) + B.addApproval('Code-Review', 2) + + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + + # If we don't make sure these jobs finish first, then one of + # the items may complete before the other and cause Zuul to + # abort the project*-job on the other item (with a "bundle + # failed to merge" error). + self.waitUntilSettled() + for build in self.builds: + if build.name == 'common-job' and build.project == 'org/project1': + break + else: + raise Exception("Unable to find build") + build.should_retry = True + + # Store a reference to the queue items so we can inspect their + # internal attributes later to double check the retry build + # count is correct. + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + pipeline = tenant.layout.pipelines['gate'] + items = pipeline.getAllItems() + self.assertEqual(len(items), 2) + + self.executor_server.release('project1-job') + self.executor_server.release('project2-job') + self.waitUntilSettled() + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + self.assertHistory([ + dict(name="project2-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="common-job", result=None, changes="2,1 1,1"), + dict(name="common-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="common-job", result="SUCCESS", changes="2,1 1,1"), + ], ordered=False) + self.assertEqual(len(self.fake_nodepool.history), 5) + self.assertEqual(items[0].change.project.name, 'org/project2') + self.assertEqual(len(items[0].current_build_set.retry_builds), 0) + self.assertEqual(items[1].change.project.name, 'org/project1') + self.assertEqual(len(items[1].current_build_set.retry_builds), 1) + + @simple_layout('layouts/job-dedup-auto-shared.yaml') + def test_job_deduplication_multi_scheduler(self): + # Test that a second scheduler can correctly refresh + # deduplicated builds + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') + + # A <-> B + A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + A.subject, B.data["url"] + ) + B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + B.subject, A.data["url"] + ) + + A.addApproval('Code-Review', 2) + B.addApproval('Code-Review', 2) + + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + + self.waitUntilSettled() + + app = self.createScheduler() + app.start() + self.assertEqual(len(self.scheds), 2) + + # Hold the lock on the first scheduler so that only the second + # will act. + with self.scheds.first.sched.run_handler_lock: + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled(matcher=[app]) + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + self.assertHistory([ + dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="common-job", result="SUCCESS", changes="2,1 1,1"), + ], ordered=False) + + @simple_layout('layouts/job-dedup-noop.yaml') + def test_job_deduplication_noop(self): + # Test that we don't deduplicate noop (there's no good reason + # to do so) + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') + + # A <-> B + A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + A.subject, B.data["url"] + ) + B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format( + B.subject, A.data["url"] + ) + + A.addApproval('Code-Review', 2) + B.addApproval('Code-Review', 2) + + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + self.assertHistory([ + dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="common-job", result="SUCCESS", changes="2,1 1,1"), + ], ordered=False) + # It's tricky to get info about a noop build, but the jobs in + # the report have the build UUID, so we make sure it's + # different. + a_noop = [l for l in A.messages[-1].split('\n') if 'noop' in l][0] + b_noop = [l for l in B.messages[-1].split('\n') if 'noop' in l][0] + self.assertNotEqual(a_noop, b_noop) + @simple_layout('layouts/job-dedup-retry.yaml') def test_job_deduplication_retry(self): A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index f0f9027bd..aa2bb1758 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -699,6 +699,7 @@ class TestPolling(ZuulTestCase): files=file_dict) A.setMerged() self.waitForPoll('gerrit') + self.waitUntilSettled() B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') B.setCheck('zuul:check', reset=True) diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index 1bfba36bb..49dae0ccb 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -38,7 +38,7 @@ from tests.base import (AnsibleZuulTestCase, BaseTestCase, simple_layout, random_sha1) from tests.base import ZuulWebFixture -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestGithubDriver(ZuulTestCase): @@ -1384,7 +1384,7 @@ class TestGithubDriver(ZuulTestCase): # now check if the merge was done via rebase merges = [report for report in self.fake_github.github_data.reports if report[2] == 'merge'] - assert(len(merges) == 1 and merges[0][3] == 'squash') + assert (len(merges) == 1 and merges[0][3] == 'squash') @simple_layout('layouts/basic-github.yaml', driver='github') def test_invalid_event(self): diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py index 2715cdef1..6c4d4eeb9 100644 --- a/tests/unit/test_gitlab_driver.py +++ b/tests/unit/test_gitlab_driver.py @@ -28,7 +28,7 @@ from tests.base import ZuulTestCase, ZuulWebFixture from testtools.matchers import MatchesRegex -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestGitlabWebhook(ZuulTestCase): diff --git a/tests/unit/test_pagure_driver.py b/tests/unit/test_pagure_driver.py index 32635a389..92159cc9f 100644 --- a/tests/unit/test_pagure_driver.py +++ b/tests/unit/test_pagure_driver.py @@ -26,7 +26,7 @@ from zuul.zk.layout import LayoutState from tests.base import ZuulTestCase, simple_layout from tests.base import ZuulWebFixture -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestPagureDriver(ZuulTestCase): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index c6865a8d7..5e0385be3 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -50,8 +50,9 @@ from tests.base import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.layout import LayoutState +from zuul.zk.locks import management_queue_lock -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestSchedulerSSL(SSLZuulTestCase): @@ -451,6 +452,7 @@ class TestScheduler(ZuulTestCase): 'zuul.tenant.tenant-one.reconfiguration_time', 'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time', 'zuul.tenant.tenant-one.pipeline.gate.merge_request_time', + 'zuul.tenant.tenant-one.pipeline.gate.merger_merge_op_time', 'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time', 'zuul.tenant.tenant-one.pipeline.gate.node_request_time', 'zuul.tenant.tenant-one.pipeline.gate.job_wait_time', @@ -4214,6 +4216,56 @@ class TestScheduler(ZuulTestCase): dict(name='check-job', result='SUCCESS', changes='1,1'), ]) + @simple_layout('layouts/trigger-sequence.yaml') + def test_live_reconfiguration_trigger_sequence(self): + # Test that events arriving after an event that triggers a + # reconfiguration are handled after the reconfiguration + # completes. + + in_repo_conf = "[{project: {tag: {jobs: [post-job]}}}]" + file_dict = {'zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + sched = self.scheds.first.sched + # Hold the management queue so that we don't process any + # reconfiguration events yet. + with management_queue_lock( + self.zk_client, 'tenant-one', blocking=False + ): + with sched.run_handler_lock: + A.setMerged() + # Submit two events while no processing is happening: + # A change merged event that will trigger a reconfiguration + self.fake_gerrit.addEvent(A.getChangeMergedEvent()) + + # And a tag event which should only run a job after + # the config change above is in effect. + event = self.fake_gerrit.addFakeTag( + 'org/project', 'master', 'foo') + self.fake_gerrit.addEvent(event) + + # Wait for the tenant trigger queue to empty out, and for + # us to have a tenant management as well as a pipeline + # trigger event. At this point, we should be deferring + # the trigger event until the management event is handled. + for _ in iterate_timeout(60, 'queues'): + with sched.run_handler_lock: + if sched.trigger_events['tenant-one'].hasEvents(): + continue + if not sched.pipeline_trigger_events[ + 'tenant-one']['tag'].hasEvents(): + continue + if not sched.management_events['tenant-one'].hasEvents(): + continue + break + + # Now we can resume and process the reconfiguration event + sched.wake_event.set() + self.waitUntilSettled() + self.assertHistory([ + dict(name='post-job', result='SUCCESS'), + ]) + @simple_layout('layouts/repo-deleted.yaml') def test_repo_deleted(self): self.init_repo("org/delete-project") @@ -5680,8 +5732,7 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(A.reported, 2) self.assertTrue(re.search('project-merge .* NODE_FAILURE', A.messages[1])) - self.assertTrue(re.search('project-test1 .* SKIPPED', A.messages[1])) - self.assertTrue(re.search('project-test2 .* SKIPPED', A.messages[1])) + self.assertTrue('Skipped 2 jobs' in A.messages[1]) def test_nodepool_resources(self): "Test that resources are reported" @@ -6803,7 +6854,7 @@ class TestDependencyGraph(ZuulTestCase): self.assertHistory([ dict(name='build', result='FAILURE', changes='1,1'), ], ordered=False) - self.assertIn('SKIPPED', A.messages[0]) + self.assertIn('Skipped 1 job', A.messages[0]) class TestDuplicatePipeline(ZuulTestCase): diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py index 7d95f2548..f371a8064 100644 --- a/tests/unit/test_sos.py +++ b/tests/unit/test_sos.py @@ -55,6 +55,79 @@ class TestScaleOutScheduler(ZuulTestCase): dict(name='project-test2', result='SUCCESS', changes='1,1'), ], ordered=False) + def test_pipeline_cache_clear(self): + # Test that the pipeline cache on a second scheduler isn't + # holding old change objects. + + # Hold jobs in build + sched1 = self.scheds.first + self.executor_server.hold_jobs_in_build = True + + # We need a pair of changes in order to populate the pipeline + # change cache (a single change doesn't activate the cache, + # it's for dependencies). + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('Code-Review', 2) + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.addApproval('Code-Review', 2) + B.addApproval('Approved', 1) + B.setDependsOn(A, 1) + + # Fail a job + self.executor_server.failJob('project-test1', A) + + # Enqueue into gate with scheduler 1 + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() + + # Start scheduler 2 + sched2 = self.createScheduler() + sched2.start() + self.assertEqual(len(self.scheds), 2) + + # Pause scheduler 1 + with sched1.sched.run_handler_lock: + # Release jobs + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + # Wait for scheduler 2 to dequeue + self.waitUntilSettled(matcher=[sched2]) + # Unpause scheduler 1 + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + + # Clear zk change cache + self.fake_gerrit._change_cache.prune([], max_age=0) + + # At this point, scheduler 1 should have a bogus change entry + # in the pipeline cache because scheduler 2 performed the + # dequeue so scheduler 1 never cleaned up its cache. + + self.executor_server.fail_tests.clear() + self.executor_server.hold_jobs_in_build = True + # Pause scheduler 1 + with sched1.sched.run_handler_lock: + # Enqueue into gate with scheduler 2 + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled(matcher=[sched2]) + + # Pause scheduler 2 + with sched2.sched.run_handler_lock: + # Make sure that scheduler 1 does some pipeline runs which + # reconstitute state from ZK. This gives it the + # opportunity to use old cache data if we don't clear it. + + # Release job1 + self.executor_server.release() + self.waitUntilSettled(matcher=[sched1]) + # Release job2 + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + # Wait for scheduler 1 to merge change + self.waitUntilSettled(matcher=[sched1]) + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + @simple_layout('layouts/multi-scheduler-status.yaml') def test_multi_scheduler_status(self): self.hold_merge_jobs_in_queue = True diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index 9fd8ea1d3..a89bb3007 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -4930,7 +4930,7 @@ class TestDataReturn(AnsibleZuulTestCase): self.assertIn( '- data-return https://zuul.example.com/', A.messages[-1]) - self.assertTrue(re.search('child .* SKIPPED', A.messages[-1])) + self.assertTrue('Skipped 1 job' in A.messages[-1]) self.assertIn('Build succeeded', A.messages[-1]) def test_data_return_invalid_child_job(self): @@ -4943,7 +4943,7 @@ class TestDataReturn(AnsibleZuulTestCase): self.assertIn( '- data-return-invalid-child-job https://zuul.example.com', A.messages[-1]) - self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1])) + self.assertTrue('Skipped 1 job' in A.messages[-1]) self.assertIn('Build succeeded', A.messages[-1]) def test_data_return_skip_all_child_jobs(self): @@ -4957,8 +4957,7 @@ class TestDataReturn(AnsibleZuulTestCase): self.assertIn( '- data-return-skip-all https://zuul.example.com/', A.messages[-1]) - self.assertTrue(re.search('child .* SKIPPED', A.messages[-1])) - self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1])) + self.assertTrue('Skipped 2 jobs' in A.messages[-1]) self.assertIn('Build succeeded', A.messages[-1]) def test_data_return_skip_all_child_jobs_with_soft_dependencies(self): @@ -4972,8 +4971,7 @@ class TestDataReturn(AnsibleZuulTestCase): ]) self.assertIn('- data-return-cd https://zuul.example.com/', A.messages[-1]) - self.assertTrue(re.search('data-return-a .* SKIPPED', A.messages[-1])) - self.assertTrue(re.search('data-return-b .* SKIPPED', A.messages[-1])) + self.assertTrue('Skipped 2 jobs' in A.messages[-1]) self.assertIn('Build succeeded', A.messages[-1]) def test_several_zuul_return(self): @@ -4987,7 +4985,7 @@ class TestDataReturn(AnsibleZuulTestCase): self.assertIn( '- several-zuul-return-child https://zuul.example.com/', A.messages[-1]) - self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1])) + self.assertTrue('Skipped 1 job' in A.messages[-1]) self.assertIn('Build succeeded', A.messages[-1]) def test_data_return_skip_retry(self): @@ -6905,7 +6903,7 @@ class TestJobPause(AnsibleZuulTestCase): dict(name='compile', result='SUCCESS', changes='1,1'), ]) - self.assertTrue(re.search('test .* SKIPPED', A.messages[0])) + self.assertTrue('Skipped 1 job' in A.messages[0]) def test_job_pause_pre_skipped_child(self): """ @@ -6953,7 +6951,7 @@ class TestJobPause(AnsibleZuulTestCase): dict(name='compile', result='SUCCESS', changes='1,1'), ]) - self.assertTrue(re.search('test .* SKIPPED', A.messages[0])) + self.assertTrue('Skipped 1 job' in A.messages[0]) def test_job_pause_skipped_child_retry(self): """ @@ -7822,7 +7820,7 @@ class TestProvidesRequiresMysql(ZuulTestCase): dict(name='image-builder', result='FAILURE', changes='1,1'), dict(name='hold', result='SUCCESS', changes='1,1'), ], ordered=False) - self.assertTrue(re.search('image-user .* SKIPPED', A.messages[0])) + self.assertTrue('Skipped 1 job' in A.messages[0]) B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index eadf5c855..b1f393e47 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -1290,7 +1290,7 @@ class TestLayoutStore(ZooKeeperBaseTestCase): "github": 456, } state = LayoutState("tenant", "hostname", 0, layout_uuid, - branch_cache_min_ltimes) + branch_cache_min_ltimes, -1) store["tenant"] = state self.assertEqual(state, store["tenant"]) self.assertNotEqual(state.ltime, -1) @@ -1301,9 +1301,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase): def test_ordering(self): layout_uuid = uuid.uuid4().hex state_one = LayoutState("tenant", "hostname", 1, layout_uuid, - {}, ltime=1) + {}, -1, ltime=1) state_two = LayoutState("tenant", "hostname", 2, layout_uuid, - {}, ltime=2) + {}, -1, ltime=2) self.assertGreater(state_two, state_one) @@ -1312,9 +1312,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase): min_ltimes = defaultdict(lambda x: -1) min_ltimes['foo'] = 1 state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex, - {}, ltime=1) + {}, -1, ltime=1) state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex, - {}, ltime=2) + {}, -1, ltime=2) store.setMinLtimes(state_one, min_ltimes) store.setMinLtimes(state_two, min_ltimes) store['tenant'] = state_one diff --git a/web/src/containers/status/ChangePanel.jsx b/web/src/containers/status/ChangePanel.jsx index 33fc4687c..dd4fc27e5 100644 --- a/web/src/containers/status/ChangePanel.jsx +++ b/web/src/containers/status/ChangePanel.jsx @@ -18,6 +18,7 @@ import { connect } from 'react-redux' import { Link } from 'react-router-dom' import * as moment from 'moment' import 'moment-duration-format' +import { Button } from '@patternfly/react-core' class ChangePanel extends React.Component { @@ -30,9 +31,11 @@ class ChangePanel extends React.Component { constructor () { super() this.state = { - expanded: false + expanded: false, + showSkipped: false, } this.onClick = this.onClick.bind(this) + this.toggleSkippedJobs = this.toggleSkippedJobs.bind(this) this.clicked = false } @@ -120,12 +123,13 @@ class ChangePanel extends React.Component { } renderProgressBar (change) { - let jobPercent = (100 / change.jobs.length).toFixed(2) + const interesting_jobs = change.jobs.filter(j => this.jobStrResult(j) !== 'skipped') + let jobPercent = (100 / interesting_jobs.length).toFixed(2) return ( <div className='progress zuul-change-total-result'> {change.jobs.map((job, idx) => { let result = this.jobStrResult(job) - if (['queued', 'waiting'].includes(result)) { + if (['queued', 'waiting', 'skipped'].includes(result)) { return '' } let className = '' @@ -144,7 +148,6 @@ class ChangePanel extends React.Component { className = ' progress-bar-warning' break case 'paused': - case 'skipped': className = ' progress-bar-info' break default: @@ -302,15 +305,39 @@ class ChangePanel extends React.Component { </span>) } + toggleSkippedJobs (e) { + // Skip middle mouse button + if (e.button === 1) { + return + } + this.setState({ showSkipped: !this.state.showSkipped }) + } + renderJobList (jobs, times) { + const [buttonText, interestingJobs] = this.state.showSkipped ? + ['Hide', jobs] : + ['Show', jobs.filter(j => this.jobStrResult(j) !== 'skipped')] + const skippedJobCount = jobs.length - interestingJobs.length + return ( - <ul className='list-group zuul-patchset-body'> - {jobs.map((job, idx) => ( - <li key={idx} className='list-group-item zuul-change-job'> - {this.renderJob(job, times.jobs[job.name])} - </li> - ))} - </ul>) + <> + <ul className='list-group zuul-patchset-body'> + {interestingJobs.map((job, idx) => ( + <li key={idx} className='list-group-item zuul-change-job'> + {this.renderJob(job, times.jobs[job.name])} + </li> + ))} + {(this.state.showSkipped || skippedJobCount) ? ( + <li key='last' className='list-group-item zuul-change-job'> + <Button variant="link" className='zuul-skipped-jobs-button' + onClick={this.toggleSkippedJobs}> + {buttonText} {skippedJobCount ? skippedJobCount : ''} skipped job{skippedJobCount === 1 ? '' : 's'} + </Button> + </li> + ) : ''} + </ul> + </> + ) } calculateTimes (change) { diff --git a/web/src/containers/status/ChangePanel.test.jsx b/web/src/containers/status/ChangePanel.test.jsx index 5a4be8602..cd27edc73 100644 --- a/web/src/containers/status/ChangePanel.test.jsx +++ b/web/src/containers/status/ChangePanel.test.jsx @@ -16,6 +16,7 @@ import React from 'react' import { Link, BrowserRouter as Router } from 'react-router-dom' import { Provider } from 'react-redux' import { create } from 'react-test-renderer' +import { Button } from '@patternfly/react-core' import { setTenantAction } from '../../actions/tenant' import configureStore from '../../store' @@ -45,6 +46,8 @@ it('change panel render multi tenant links', () => { const jobLink = application.root.findByType(Link) expect(jobLink.props.to).toEqual( '/t/tenant-one/stream/42') + const skipButton = application.root.findAllByType(Button) + expect(skipButton === undefined) }) it('change panel render white-label tenant links', () => { @@ -60,4 +63,29 @@ it('change panel render white-label tenant links', () => { const jobLink = application.root.findByType(Link) expect(jobLink.props.to).toEqual( '/stream/42') + const skipButton = application.root.findAllByType(Button) + expect(skipButton === undefined) +}) + +it('change panel skip jobs', () => { + const fakeChange = { + project: 'org-project', + jobs: [{ + name: 'job-name', + url: 'stream/42', + result: 'skipped' + }] + } + + const store = configureStore() + store.dispatch(setTenantAction('tenant-one', true)) + const application = create( + <Provider store={store}> + <Router> + <ChangePanel change={fakeChange} globalExpanded={true} /> + </Router> + </Provider> + ) + const skipButton = application.root.findByType(Button) + expect(skipButton.props.children.includes('skipped job')) }) diff --git a/web/src/index.css b/web/src/index.css index e8c67a372..eddbca673 100644 --- a/web/src/index.css +++ b/web/src/index.css @@ -189,6 +189,11 @@ a.refresh { font-size: small; } +.zuul-skipped-jobs-button { + font-size: small; + padding: 0; +} + .zuul-non-voting-desc { font-size: smaller; } diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 490e47c59..6d6f16968 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -1044,6 +1044,7 @@ class Client(zuul.cmd.ZuulApp): tenant_name=args.tenant, hostname='admin command', last_reconfigured=int(time.time()), + last_reconfigure_event_ltime=-1, uuid=uuid4().hex, branch_cache_min_ltimes={}, ltime=ps._zstat.last_modified_transaction_id, diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 6aea4388b..1ec334915 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -1470,7 +1470,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): # for large projects like nova alldata = [] chunk, more_changes = _query_chunk(query, event) - while(chunk): + while chunk: alldata.extend(chunk) if more_changes is None: # continue sortKey based (before Gerrit 2.9) diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py index de62f2565..73d3a6f13 100644 --- a/zuul/driver/github/githubreporter.py +++ b/zuul/driver/github/githubreporter.py @@ -135,9 +135,12 @@ class GithubReporter(BaseReporter): def _formatItemReportJobs(self, item): # Return the list of jobs portion of the report ret = '' - jobs_fields = self._getItemReportJobsFields(item) + jobs_fields, skipped = self._getItemReportJobsFields(item) for job_fields in jobs_fields: ret += self._formatJobResult(job_fields) + if skipped: + jobtext = 'job' if skipped == 1 else 'jobs' + ret += 'Skipped %i %s\n' % (skipped, jobtext) return ret def addPullComment(self, item, comment=None): diff --git a/zuul/driver/pagure/pagurereporter.py b/zuul/driver/pagure/pagurereporter.py index 0bfdbc9b8..b38035752 100644 --- a/zuul/driver/pagure/pagurereporter.py +++ b/zuul/driver/pagure/pagurereporter.py @@ -67,9 +67,12 @@ class PagureReporter(BaseReporter): def _formatItemReportJobs(self, item): # Return the list of jobs portion of the report ret = '' - jobs_fields = self._getItemReportJobsFields(item) + jobs_fields, skipped = self._getItemReportJobsFields(item) for job_fields in jobs_fields: ret += '- [%s](%s) : %s%s%s%s\n' % job_fields[:6] + if skipped: + jobtext = 'job' if skipped == 1 else 'jobs' + ret += 'Skipped %i %s\n' % (skipped, jobtext) return ret def addPullComment(self, item, comment=None): diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py index 4f11a583b..619b1a8ff 100644 --- a/zuul/driver/timer/__init__.py +++ b/zuul/driver/timer/__init__.py @@ -130,6 +130,24 @@ class TimerDriver(Driver, TriggerInterface): pipeline.name) continue + self._addJobsInner(tenant, pipeline, trigger, timespec, + jobs) + + def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs): + # jobs is a list that we mutate + for project_name, pcs in tenant.layout.project_configs.items(): + # timer operates on branch heads and doesn't need + # speculative layouts to decide if it should be + # enqueued or not. So it can be decided on cached + # data if it needs to run or not. + pcst = tenant.layout.getAllProjectConfigs(project_name) + if not [True for pc in pcst if pipeline.name in pc.pipelines]: + continue + + (trusted, project) = tenant.getProject(project_name) + try: + for branch in project.source.getProjectBranches( + project, tenant): # The 'misfire_grace_time' argument is set to None to # disable checking if the job missed its run time window. # This ensures we don't miss a trigger when the job is @@ -137,11 +155,17 @@ class TimerDriver(Driver, TriggerInterface): # delays are not a problem for our trigger use-case. job = self.apsched.add_job( self._onTrigger, trigger=trigger, - args=(tenant, pipeline.name, timespec,), + args=(tenant, pipeline.name, project_name, + branch, timespec,), misfire_grace_time=None) jobs.append(job) + except Exception: + self.log.exception("Unable to create APScheduler job for " + "%s %s %s", + tenant, pipeline, project) - def _onTrigger(self, tenant, pipeline_name, timespec): + def _onTrigger(self, tenant, pipeline_name, project_name, branch, + timespec): if not self.election_won: return @@ -150,55 +174,43 @@ class TimerDriver(Driver, TriggerInterface): return try: - self._dispatchEvent(tenant, pipeline_name, timespec) + self._dispatchEvent(tenant, pipeline_name, project_name, + branch, timespec) except Exception: self.stop_event.set() self.log.exception("Error when dispatching timer event") - def _dispatchEvent(self, tenant, pipeline_name, timespec): - self.log.debug('Got trigger for tenant %s and pipeline %s with ' - 'timespec %s', tenant.name, pipeline_name, timespec) - for project_name, pcs in tenant.layout.project_configs.items(): - try: - # timer operates on branch heads and doesn't need - # speculative layouts to decide if it should be - # enqueued or not. So it can be decided on cached - # data if it needs to run or not. - pcst = tenant.layout.getAllProjectConfigs(project_name) - if not [True for pc in pcst if pipeline_name in pc.pipelines]: - continue - - (trusted, project) = tenant.getProject(project_name) - for branch in project.source.getProjectBranches( - project, tenant): - try: - event = TimerTriggerEvent() - event.type = 'timer' - event.timespec = timespec - event.forced_pipeline = pipeline_name - event.project_hostname = project.canonical_hostname - event.project_name = project.name - event.ref = 'refs/heads/%s' % branch - event.branch = branch - event.zuul_event_id = str(uuid4().hex) - event.timestamp = time.time() - # Refresh the branch in order to update the item in the - # change cache. - change_key = project.source.getChangeKey(event) - with self.project_update_locks[project.canonical_name]: - project.source.getChange(change_key, refresh=True, - event=event) - log = get_annotated_logger(self.log, event) - log.debug("Adding event") - self.sched.addTriggerEvent(self.name, event) - except Exception: - self.log.exception("Error dispatching timer event for " - "project %s branch %s", - project, branch) - except Exception: - self.log.exception("Error dispatching timer event for " - "project %s", - project) + def _dispatchEvent(self, tenant, pipeline_name, project_name, + branch, timespec): + self.log.debug('Got trigger for tenant %s and pipeline %s ' + 'project %s branch %s with timespec %s', + tenant.name, pipeline_name, project_name, + branch, timespec) + try: + (trusted, project) = tenant.getProject(project_name) + event = TimerTriggerEvent() + event.type = 'timer' + event.timespec = timespec + event.forced_pipeline = pipeline_name + event.project_hostname = project.canonical_hostname + event.project_name = project.name + event.ref = 'refs/heads/%s' % branch + event.branch = branch + event.zuul_event_id = str(uuid4().hex) + event.timestamp = time.time() + # Refresh the branch in order to update the item in the + # change cache. + change_key = project.source.getChangeKey(event) + with self.project_update_locks[project.canonical_name]: + project.source.getChange(change_key, refresh=True, + event=event) + log = get_annotated_logger(self.log, event) + log.debug("Adding event") + self.sched.addTriggerEvent(self.name, event) + except Exception: + self.log.exception("Error dispatching timer event for " + "tenant %s project %s branch %s", + tenant, project_name, branch) def stop(self): self.stopped = True diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 2165a797b..eac7fa7e5 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -3735,7 +3735,7 @@ class ExecutorServer(BaseMergeServer): sensor.reportStats(self.statsd, base_key) def finishJob(self, unique): - del(self.job_workers[unique]) + del self.job_workers[unique] self.log.debug( "Finishing Job: %s, queue(%d): %s", unique, diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 642aededd..a66f5ad22 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -235,6 +235,9 @@ class PipelineManager(metaclass=ABCMeta): resolved_changes.append(change) return resolved_changes + def clearCache(self): + self._change_cache.clear() + def _maintainCache(self): active_layout_uuids = set() referenced_change_keys = set() @@ -1768,9 +1771,12 @@ class PipelineManager(metaclass=ABCMeta): build_in_items = [item] if item.bundle: for other_item in item.bundle.items: - if other_item not in build_in_items: - if other_item.current_build_set.getBuild(build.job.name): - build_in_items.append(other_item) + if other_item in build_in_items: + continue + other_build = other_item.current_build_set.getBuild( + build.job.name) + if other_build is not None and other_build is build: + build_in_items.append(other_item) for item in build_in_items: # We don't care about some actions below if this build # isn't in the current buildset, so determine that before @@ -1817,18 +1823,27 @@ class PipelineManager(metaclass=ABCMeta): # We're the second of the files/merger pair, report the stat self.reportPipelineTiming('merge_request_time', build_set.configured_time) + if event.elapsed_time: + self.reportPipelineTiming('merger_files_changes_op_time', + event.elapsed_time, elapsed=True) def onMergeCompleted(self, event, build_set): if build_set.merge_state == build_set.COMPLETE: self._onGlobalRepoStateCompleted(event, build_set) self.reportPipelineTiming('repo_state_time', build_set.repo_state_request_time) + if event.elapsed_time: + self.reportPipelineTiming('merger_repo_state_op_time', + event.elapsed_time, elapsed=True) else: self._onMergeCompleted(event, build_set) if build_set.files_state == build_set.COMPLETE: # We're the second of the files/merger pair, report the stat self.reportPipelineTiming('merge_request_time', build_set.configured_time) + if event.elapsed_time: + self.reportPipelineTiming('merger_merge_op_time', + event.elapsed_time, elapsed=True) def _onMergeCompleted(self, event, build_set): @@ -2120,7 +2135,7 @@ class PipelineManager(metaclass=ABCMeta): except Exception: self.log.exception("Exception reporting pipeline stats") - def reportPipelineTiming(self, key, start, end=None): + def reportPipelineTiming(self, key, start, end=None, elapsed=False): if not self.sched.statsd: return if not start: @@ -2130,5 +2145,8 @@ class PipelineManager(metaclass=ABCMeta): pipeline = self.pipeline tenant = pipeline.tenant stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}' - dt = (end - start) * 1000 + if elapsed: + dt = start + else: + dt = (end - start) * 1000 self.sched.statsd.timing(f'{stats_key}.{key}', dt) diff --git a/zuul/merger/client.py b/zuul/merger/client.py index 362644b98..29fa39aaf 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -159,7 +159,9 @@ class MergeClient(object): "via result event for %s", merge_request) if merge_request.job_type == MergeRequest.FILES_CHANGES: event = FilesChangesCompletedEvent( - merge_request.build_set_uuid, files=None + merge_request.build_set_uuid, + files=None, + elapsed_time=None, ) else: event = MergeCompletedEvent( @@ -172,6 +174,7 @@ class MergeClient(object): repo_state=None, item_in_branches=None, errors=None, + elapsed_time=None, ) try: self.result_events[merge_request.tenant_name][ diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 91597714f..fe5b938a1 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -256,6 +256,7 @@ class BaseMergeServer(metaclass=ABCMeta): def executeMergeJob(self, merge_request, params): result = None + start = time.monotonic() if merge_request.job_type == MergeRequest.MERGE: result = self.merge(merge_request, params) elif merge_request.job_type == MergeRequest.CAT: @@ -264,6 +265,8 @@ class BaseMergeServer(metaclass=ABCMeta): result = self.refstate(merge_request, params) elif merge_request.job_type == MergeRequest.FILES_CHANGES: result = self.fileschanges(merge_request, params) + end = time.monotonic() + result['elapsed_time'] = end - start return result def cat(self, merge_request, args): @@ -376,6 +379,7 @@ class BaseMergeServer(metaclass=ABCMeta): item_in_branches = result.get("item_in_branches", []) files = result.get("files", {}) errors = result.get("errors", []) + elapsed_time = result.get("elapsed_time") log.info( "Merge %s complete, merged: %s, updated: %s, commit: %s, " @@ -407,7 +411,9 @@ class BaseMergeServer(metaclass=ABCMeta): ) if merge_request.job_type == MergeRequest.FILES_CHANGES: event = FilesChangesCompletedEvent( - merge_request.build_set_uuid, files + merge_request.build_set_uuid, + files, + elapsed_time, ) else: event = MergeCompletedEvent( @@ -420,6 +426,7 @@ class BaseMergeServer(metaclass=ABCMeta): repo_state, item_in_branches, errors, + elapsed_time, ) def put_complete_event(log, merge_request, event): diff --git a/zuul/model.py b/zuul/model.py index aa814ce6c..963332826 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -694,6 +694,11 @@ class PipelineState(zkobject.ZKObject): return json.dumps(data, sort_keys=True).encode("utf8") def deserialize(self, raw, context): + # We may have old change objects in the pipeline cache, so + # make sure they are the same objects we would get from the + # source change cache. + self.pipeline.manager.clearCache() + data = super().deserialize(raw, context) existing_queues = { q.getPath(): q for q in self.queues + self.old_queues @@ -742,8 +747,56 @@ class PipelineState(zkobject.ZKObject): "queues": queues, "old_queues": old_queues, }) + if context.build_references: + self._fixBuildReferences(data, context) + context.build_references = False return data + def _fixBuildReferences(self, data, context): + # Reconcile duplicate builds; if we find any BuildReference + # objects, look up the actual builds and replace + log = context.log + build_map = {} + to_replace_dicts = [] + to_replace_lists = [] + for queue in data['queues'] + data['old_queues']: + for item in queue.queue: + buildset = item.current_build_set + for build_job, build in buildset.builds.items(): + if isinstance(build, BuildReference): + to_replace_dicts.append((item, + buildset.builds, + build_job, + build._path)) + else: + build_map[build.getPath()] = build + for job_name, build_list in buildset.retry_builds.items(): + for build in build_list: + if isinstance(build, BuildReference): + to_replace_lists.append((item, + build_list, + build, + build._path)) + else: + build_map[build.getPath()] = build + for (item, build_dict, build_job, build_path) in to_replace_dicts: + orig_build = build_map.get(build_path) + if orig_build: + build_dict[build_job] = orig_build + else: + log.warning("Unable to find deduplicated build %s for %s", + build_path, item) + del build_dict[build_job] + for (item, build_list, build, build_path) in to_replace_lists: + idx = build_list.index(build) + orig_build = build_map.get(build_path) + if orig_build: + build_list[idx] = build_map[build_path] + else: + log.warning("Unable to find deduplicated build %s for %s", + build_path, item) + del build_list[idx] + def _getKnownItems(self): items = [] for queue in (*self.old_queues, *self.queues): @@ -3424,6 +3477,11 @@ class BuildRequest(JobRequest): ) +class BuildReference: + def __init__(self, _path): + self._path = _path + + class Build(zkobject.ZKObject): """A Build is an instance of a single execution of a Job. @@ -3852,6 +3910,13 @@ class BuildSet(zkobject.ZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") + def _isMyBuild(self, build_path): + parts = build_path.split('/') + buildset_uuid = parts[-5] + if buildset_uuid == self.uuid: + return True + return False + def deserialize(self, raw, context): data = super().deserialize(raw, context) # Set our UUID so that getPath() returns the correct path for @@ -3944,8 +4009,12 @@ class BuildSet(zkobject.ZKObject): if not build.result: build.refresh(context) else: - build = Build.fromZK( - context, build_path, job=job, build_set=self) + if not self._isMyBuild(build_path): + build = BuildReference(build_path) + context.build_references = True + else: + build = Build.fromZK( + context, build_path, job=job, build_set=self) builds[job_name] = build for retry_path in data["retry_builds"].get(job_name, []): @@ -3954,8 +4023,12 @@ class BuildSet(zkobject.ZKObject): # Retry builds never change. pass else: - retry_build = Build.fromZK( - context, retry_path, job=job, build_set=self) + if not self._isMyBuild(retry_path): + retry_build = BuildReference(retry_path) + context.build_references = True + else: + retry_build = Build.fromZK( + context, retry_path, job=job, build_set=self) retry_builds[job_name].append(retry_build) data.update({ @@ -5910,6 +5983,7 @@ class TenantReconfigureEvent(ManagementEvent): self.tenant_name = tenant_name self.project_branches = set([(project_name, branch_name)]) self.branch_cache_ltimes = {} + self.trigger_event_ltime = -1 self.merged_events = [] def __ne__(self, other): @@ -5931,6 +6005,8 @@ class TenantReconfigureEvent(ManagementEvent): self.branch_cache_ltimes.get(connection_name, ltime), ltime) self.zuul_event_ltime = max(self.zuul_event_ltime, other.zuul_event_ltime) + self.trigger_event_ltime = max(self.trigger_event_ltime, + other.trigger_event_ltime) self.merged_events.append(other) def toDict(self): @@ -5938,6 +6014,7 @@ class TenantReconfigureEvent(ManagementEvent): d["tenant_name"] = self.tenant_name d["project_branches"] = list(self.project_branches) d["branch_cache_ltimes"] = self.branch_cache_ltimes + d["trigger_event_ltime"] = self.trigger_event_ltime return d @classmethod @@ -5953,6 +6030,7 @@ class TenantReconfigureEvent(ManagementEvent): tuple(pb) for pb in data["project_branches"] ) event.branch_cache_ltimes = data.get("branch_cache_ltimes", {}) + event.trigger_event_ltime = data.get("trigger_event_ltime", -1) return event @@ -6175,12 +6253,13 @@ class MergeCompletedEvent(ResultEvent): :arg dict repo_state: The starting repo state before the merge. :arg list item_in_branches: A list of branches in which the final commit in the merge list appears (changes without refs). - :arg list errors: A list of error message strings + :arg list errors: A list of error message strings. + :arg float elapsed_time: Elapsed time of merge op in seconds. """ def __init__(self, request_uuid, build_set_uuid, merged, updated, commit, files, repo_state, item_in_branches, - errors): + errors, elapsed_time): self.request_uuid = request_uuid self.build_set_uuid = build_set_uuid self.merged = merged @@ -6190,6 +6269,7 @@ class MergeCompletedEvent(ResultEvent): self.repo_state = repo_state or {} self.item_in_branches = item_in_branches or [] self.errors = errors or [] + self.elapsed_time = elapsed_time def __repr__(self): return ('<MergeCompletedEvent job: %s buildset: %s merged: %s ' @@ -6209,6 +6289,7 @@ class MergeCompletedEvent(ResultEvent): "repo_state": dict(self.repo_state), "item_in_branches": list(self.item_in_branches), "errors": list(self.errors), + "elapsed_time": self.elapsed_time, } @classmethod @@ -6223,6 +6304,7 @@ class MergeCompletedEvent(ResultEvent): dict(data.get("repo_state", {})), list(data.get("item_in_branches", [])), list(data.get("errors", [])), + data.get("elapsed_time"), ) @@ -6231,16 +6313,19 @@ class FilesChangesCompletedEvent(ResultEvent): :arg BuildSet build_set: The build_set which is ready. :arg list files: List of files changed. + :arg float elapsed_time: Elapsed time of merge op in seconds. """ - def __init__(self, build_set_uuid, files): + def __init__(self, build_set_uuid, files, elapsed_time): self.build_set_uuid = build_set_uuid self.files = files or [] + self.elapsed_time = elapsed_time def toDict(self): return { "build_set_uuid": self.build_set_uuid, "files": list(self.files), + "elapsed_time": self.elapsed_time, } @classmethod @@ -6248,6 +6333,7 @@ class FilesChangesCompletedEvent(ResultEvent): return cls( data.get("build_set_uuid"), list(data.get("files", [])), + data.get("elapsed_time"), ) @@ -6289,6 +6375,9 @@ class TriggerEvent(AbstractEvent): self.branch_deleted = False self.branch_protected = True self.ref = None + # For reconfiguration sequencing + self.min_reconfigure_ltime = -1 + self.zuul_event_ltime = None # For management events (eg: enqueue / promote) self.tenant_name = None self.project_hostname = None @@ -6326,6 +6415,8 @@ class TriggerEvent(AbstractEvent): "branch_deleted": self.branch_deleted, "branch_protected": self.branch_protected, "ref": self.ref, + "min_reconfigure_ltime": self.min_reconfigure_ltime, + "zuul_event_ltime": self.zuul_event_ltime, "tenant_name": self.tenant_name, "project_hostname": self.project_hostname, "project_name": self.project_name, @@ -6358,6 +6449,8 @@ class TriggerEvent(AbstractEvent): self.branch_deleted = d["branch_deleted"] self.branch_protected = d["branch_protected"] self.ref = d["ref"] + self.min_reconfigure_ltime = d.get("min_reconfigure_ltime", -1) + self.zuul_event_ltime = d.get("zuul_event_ltime", None) self.tenant_name = d["tenant_name"] self.project_hostname = d["project_hostname"] self.project_name = d["project_name"] @@ -7066,6 +7159,7 @@ class Layout(object): noop = Job('noop') noop.description = 'A job that will always succeed, no operation.' noop.parent = noop.BASE_JOB_MARKER + noop.deduplicate = False noop.run = (PlaybookContext(None, 'noop.yaml', [], []),) self.jobs = {'noop': [noop]} self.nodesets = {} diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 9b8f2c11c..5723316a3 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -257,9 +257,13 @@ class BaseReporter(object, metaclass=abc.ABCMeta): # Extract the report elements from an item config = self.connection.sched.config jobs_fields = [] + skipped = 0 for job in item.getJobs(): build = item.current_build_set.getBuild(job.name) (result, url) = item.formatJobResult(job) + if result == 'SKIPPED': + skipped += 1 + continue if not job.voting: voting = ' (non-voting)' else: @@ -300,12 +304,15 @@ class BaseReporter(object, metaclass=abc.ABCMeta): success_message = job.success_message jobs_fields.append( (name, url, result, error, elapsed, voting, success_message)) - return jobs_fields + return jobs_fields, skipped def _formatItemReportJobs(self, item): # Return the list of jobs portion of the report ret = '' - jobs_fields = self._getItemReportJobsFields(item) + jobs_fields, skipped = self._getItemReportJobsFields(item) for job_fields in jobs_fields: ret += '- %s%s : %s%s%s%s\n' % job_fields[:6] + if skipped: + jobtext = 'job' if skipped == 1 else 'jobs' + ret += 'Skipped %i %s\n' % (skipped, jobtext) return ret diff --git a/zuul/scheduler.py b/zuul/scheduler.py index b436c356f..272235757 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -860,9 +860,14 @@ class Scheduler(threading.Thread): self.log.exception("Exception reporting runtime stats") def reconfigureTenant(self, tenant, project, trigger_event): + if trigger_event: + trigger_event_ltime = trigger_event.zuul_event_ltime + else: + trigger_event_ltime = None self.log.debug("Submitting tenant reconfiguration event for " - "%s due to event %s in project %s", - tenant.name, trigger_event, project) + "%s due to event %s in project %s, ltime %s", + tenant.name, trigger_event, project, + trigger_event_ltime) branch = trigger_event and trigger_event.branch event = TenantReconfigureEvent( tenant.name, project.canonical_name, branch, @@ -870,6 +875,7 @@ class Scheduler(threading.Thread): if trigger_event: event.branch_cache_ltimes[trigger_event.connection_name] = ( trigger_event.branch_cache_ltime) + event.trigger_event_ltime = trigger_event_ltime self.management_events[tenant.name].put(event, needs_result=False) def fullReconfigureCommandHandler(self): @@ -970,7 +976,7 @@ class Scheduler(threading.Thread): if layout_state is None: # Reconfigure only tenants w/o an existing layout state ctx = self.createZKContext(tlock, self.log) - self._reconfigureTenant(ctx, min_ltimes, tenant) + self._reconfigureTenant(ctx, min_ltimes, -1, tenant) self._reportInitialStats(tenant) else: self.local_layout_state[tenant_name] = layout_state @@ -1422,6 +1428,7 @@ class Scheduler(threading.Thread): ctx = self.createZKContext(lock, self.log) if tenant is not None: self._reconfigureTenant(ctx, min_ltimes, + -1, tenant, old_tenant) else: self._reconfigureDeleteTenant(ctx, old_tenant) @@ -1485,6 +1492,7 @@ class Scheduler(threading.Thread): tenant = self.abide.tenants[event.tenant_name] ctx = self.createZKContext(lock, self.log) self._reconfigureTenant(ctx, min_ltimes, + event.trigger_event_ltime, tenant, old_tenant) duration = round(time.monotonic() - start, 3) self.log.info("Tenant reconfiguration complete for %s (duration: %s " @@ -1639,7 +1647,8 @@ class Scheduler(threading.Thread): request) self.cancelJob(build_set, request_job) - def _reconfigureTenant(self, context, min_ltimes, tenant, + def _reconfigureTenant(self, context, min_ltimes, + last_reconfigure_event_ltime, tenant, old_tenant=None): # This is called from _doReconfigureEvent while holding the # layout lock @@ -1666,10 +1675,29 @@ class Scheduler(threading.Thread): for s in self.connections.getSources() } + # Make sure last_reconfigure_event_ltime never goes backward + old_layout_state = self.tenant_layout_state.get(tenant.name) + if old_layout_state: + if (old_layout_state.last_reconfigure_event_ltime > + last_reconfigure_event_ltime): + self.log.debug("Setting layout state last reconfigure ltime " + "to previous ltime %s which is newer than %s", + old_layout_state.last_reconfigure_event_ltime, + last_reconfigure_event_ltime) + last_reconfigure_event_ltime =\ + old_layout_state.last_reconfigure_event_ltime + if last_reconfigure_event_ltime < 0: + last_reconfigure_event_ltime = self.zk_client.getCurrentLtime() + self.log.debug("Setting layout state last reconfigure ltime " + "to current ltime %s", last_reconfigure_event_ltime) + else: + self.log.debug("Setting layout state last reconfigure ltime " + "to %s", last_reconfigure_event_ltime) layout_state = LayoutState( tenant_name=tenant.name, hostname=self.hostname, last_reconfigured=int(time.time()), + last_reconfigure_event_ltime=last_reconfigure_event_ltime, uuid=tenant.layout.uuid, branch_cache_min_ltimes=branch_cache_min_ltimes, ) @@ -2178,6 +2206,8 @@ class Scheduler(threading.Thread): "Unable to refresh pipeline change list for %s", pipeline.name) + # Get the ltime of the last reconfiguration event + self.trigger_events[tenant.name].refreshMetadata() for event in self.trigger_events[tenant.name]: log = get_annotated_logger(self.log, event.zuul_event_id) log.debug("Forwarding trigger event %s", event) @@ -2266,7 +2296,15 @@ class Scheduler(threading.Thread): # out cached data for this project and perform a # reconfiguration. self.reconfigureTenant(tenant, change.project, event) - + # This will become the new required minimum event ltime + # for every trigger event processed after the + # reconfiguration, so make sure we update it after having + # submitted the reconfiguration event. + self.trigger_events[tenant.name].last_reconfigure_event_ltime =\ + event.zuul_event_ltime + + event.min_reconfigure_ltime = self.trigger_events[ + tenant.name].last_reconfigure_event_ltime for pipeline in tenant.layout.pipelines.values(): if ( pipeline.manager.eventMatches(event, change) @@ -2281,6 +2319,21 @@ class Scheduler(threading.Thread): if self._stopped: return log = get_annotated_logger(self.log, event.zuul_event_id) + if not isinstance(event, SupercedeEvent): + local_state = self.local_layout_state[tenant.name] + last_ltime = local_state.last_reconfigure_event_ltime + # The event tells us the ltime of the most recent + # reconfiguration event up to that point. If our local + # layout state wasn't generated by an event after that + # time, then we are too out of date to process this event. + # Abort now and wait for an update. + if (event.min_reconfigure_ltime > -1 and + event.min_reconfigure_ltime > last_ltime): + log.debug("Trigger event minimum reconfigure ltime of %s " + "newer than current reconfigure ltime of %s, " + "aborting early", + event.min_reconfigure_ltime, last_ltime) + return log.debug("Processing trigger event %s", event) try: if isinstance(event, SupercedeEvent): diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index 8718f609c..52ffd582e 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -776,7 +776,7 @@ class TriggerEventQueue(ZooKeeperEventQueue): self._put(data) def __iter__(self): - for data, ack_ref, _ in self._iterEvents(): + for data, ack_ref, zstat in self._iterEvents(): try: if (data["driver_name"] is None and data["event_type"] == "SupercedeEvent"): @@ -793,6 +793,9 @@ class TriggerEventQueue(ZooKeeperEventQueue): event = event_class.fromDict(event_data) event.ack_ref = ack_ref event.driver_name = data["driver_name"] + # Initialize the logical timestamp if not valid + if event.zuul_event_ltime is None: + event.zuul_event_ltime = zstat.creation_transaction_id yield event @@ -803,6 +806,28 @@ class TenantTriggerEventQueue(TriggerEventQueue): queue_root = TENANT_TRIGGER_ROOT.format( tenant=tenant_name) super().__init__(client, queue_root, connections) + self.metadata = {} + + def _setQueueMetadata(self): + encoded_data = json.dumps( + self.metadata, sort_keys=True).encode("utf-8") + self.kazoo_client.set(self.queue_root, encoded_data) + + def refreshMetadata(self): + data, zstat = self.kazoo_client.get(self.queue_root) + try: + self.metadata = json.loads(data) + except json.JSONDecodeError: + self.metadata = {} + + @property + def last_reconfigure_event_ltime(self): + return self.metadata.get('last_reconfigure_event_ltime', -1) + + @last_reconfigure_event_ltime.setter + def last_reconfigure_event_ltime(self, val): + self.metadata['last_reconfigure_event_ltime'] = val + self._setQueueMetadata() @classmethod def createRegistry(cls, client, connections): diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py index 386a93cc4..533226767 100644 --- a/zuul/zk/layout.py +++ b/zuul/zk/layout.py @@ -49,12 +49,15 @@ class LayoutState: """ def __init__(self, tenant_name, hostname, last_reconfigured, uuid, - branch_cache_min_ltimes, ltime=-1): + branch_cache_min_ltimes, last_reconfigure_event_ltime, + ltime=-1): self.uuid = uuid self.ltime = ltime self.tenant_name = tenant_name self.hostname = hostname self.last_reconfigured = last_reconfigured + self.last_reconfigure_event_ltime =\ + last_reconfigure_event_ltime self.branch_cache_min_ltimes = branch_cache_min_ltimes def toDict(self): @@ -62,6 +65,8 @@ class LayoutState: "tenant_name": self.tenant_name, "hostname": self.hostname, "last_reconfigured": self.last_reconfigured, + "last_reconfigure_event_ltime": + self.last_reconfigure_event_ltime, "uuid": self.uuid, "branch_cache_min_ltimes": self.branch_cache_min_ltimes, } @@ -74,6 +79,7 @@ class LayoutState: data["last_reconfigured"], data.get("uuid"), data.get("branch_cache_min_ltimes"), + data.get("last_reconfigure_event_ltime", -1), data.get("ltime", -1), ) diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py index 109053f11..1be4dc2b5 100644 --- a/zuul/zk/nodepool.py +++ b/zuul/zk/nodepool.py @@ -12,6 +12,7 @@ import json import logging +import os import time from enum import Enum from typing import Optional, List @@ -43,7 +44,7 @@ class ZooKeeperNodepool(ZooKeeperBase): Class implementing Nodepool related ZooKeeper interface. """ NODES_ROOT = "/nodepool/nodes" - LAUNCHER_ROOT = "/nodepool/launchers" + COMPONENT_ROOT = "/nodepool/components" REQUEST_ROOT = '/nodepool/requests' REQUEST_LOCK_ROOT = "/nodepool/requests-lock" HOLD_REQUEST_ROOT = '/zuul/hold-requests' @@ -95,9 +96,6 @@ class ZooKeeperNodepool(ZooKeeperBase): self._node_tree.close() self._node_tree = None - def _launcherPath(self, launcher): - return "%s/%s" % (self.LAUNCHER_ROOT, launcher) - def _nodePath(self, node): return "%s/%s" % (self.NODES_ROOT, node) @@ -113,15 +111,15 @@ class ZooKeeperNodepool(ZooKeeperBase): :returns: A list of Launcher objects, or empty list if none are found. """ + root_path = os.path.join(self.COMPONENT_ROOT, 'pool') try: - launcher_ids = self.kazoo_client\ - .get_children(self.LAUNCHER_ROOT) + pools = self.kazoo_client.get_children(root_path) except NoNodeError: return [] objs = [] - for launcher in launcher_ids: - path = self._launcherPath(launcher) + for pool in pools: + path = os.path.join(root_path, pool) try: data, _ = self.kazoo_client.get(path) except NoNodeError: diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index aa32b8b9b..8de3f34ba 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -43,6 +43,7 @@ class ZKContext: self.cumulative_write_znodes = 0 self.cumulative_read_bytes = 0 self.cumulative_write_bytes = 0 + self.build_references = False def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and |