summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/developer/index.rst1
-rw-r--r--doc/source/developer/metrics.rst74
-rw-r--r--doc/source/drivers/timer.rst18
-rw-r--r--doc/source/monitoring.rst34
-rw-r--r--releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml7
-rw-r--r--tests/base.py13
-rw-r--r--tests/fixtures/layouts/job-dedup-false.yaml1
-rw-r--r--tests/fixtures/layouts/job-dedup-noop.yaml55
-rw-r--r--tests/fixtures/layouts/trigger-sequence.yaml75
-rw-r--r--tests/unit/test_circular_dependencies.py145
-rw-r--r--tests/unit/test_gerrit.py1
-rw-r--r--tests/unit/test_github_driver.py4
-rw-r--r--tests/unit/test_gitlab_driver.py2
-rw-r--r--tests/unit/test_pagure_driver.py2
-rw-r--r--tests/unit/test_scheduler.py59
-rw-r--r--tests/unit/test_sos.py73
-rw-r--r--tests/unit/test_v3.py18
-rw-r--r--tests/unit/test_zk.py10
-rw-r--r--web/src/containers/status/ChangePanel.jsx49
-rw-r--r--web/src/containers/status/ChangePanel.test.jsx28
-rw-r--r--web/src/index.css5
-rwxr-xr-xzuul/cmd/client.py1
-rw-r--r--zuul/driver/gerrit/gerritconnection.py2
-rw-r--r--zuul/driver/github/githubreporter.py5
-rw-r--r--zuul/driver/pagure/pagurereporter.py5
-rw-r--r--zuul/driver/timer/__init__.py106
-rw-r--r--zuul/executor/server.py2
-rw-r--r--zuul/manager/__init__.py28
-rw-r--r--zuul/merger/client.py5
-rw-r--r--zuul/merger/server.py9
-rw-r--r--zuul/model.py108
-rw-r--r--zuul/reporter/__init__.py11
-rw-r--r--zuul/scheduler.py63
-rw-r--r--zuul/zk/event_queues.py27
-rw-r--r--zuul/zk/layout.py8
-rw-r--r--zuul/zk/nodepool.py14
-rw-r--r--zuul/zk/zkobject.py1
37 files changed, 939 insertions, 130 deletions
diff --git a/doc/source/developer/index.rst b/doc/source/developer/index.rst
index 52266a175..b45c75640 100644
--- a/doc/source/developer/index.rst
+++ b/doc/source/developer/index.rst
@@ -14,6 +14,7 @@ Zuul, though advanced users may find it interesting.
drivers
triggers
testing
+ metrics
docs
ansible
javascript
diff --git a/doc/source/developer/metrics.rst b/doc/source/developer/metrics.rst
new file mode 100644
index 000000000..913a591ba
--- /dev/null
+++ b/doc/source/developer/metrics.rst
@@ -0,0 +1,74 @@
+:title: Metrics
+
+Metrics
+=======
+
+Event Overview
+--------------
+
+The following table illustrates the event and pipeline processing
+sequence as it relates to some of the metrics described in
+:ref:`statsd`. This is intended as general guidance only and is not
+an exhaustive list.
+
++----------------------------------------+------+------+------+--------------------------------------+
+| Event | Metrics | Attribute |
++========================================+======+======+======+======================================+
+| Event generated by source | | | | event.timestamp |
++----------------------------------------+------+ + +--------------------------------------+
+| Enqueued into driver queue | | | | |
++----------------------------------------+------+ + +--------------------------------------+
+| Enqueued into tenant trigger queue | | | | event.arrived_at_scheduler_timestamp |
++----------------------------------------+ + [8] + +--------------------------------------+
+| Forwarded to matching pipelines | [1] | | | |
++----------------------------------------+ + + +--------------------------------------+
+| Changes enqueued ahead | | | | |
++----------------------------------------+ + + +--------------------------------------+
+| Change enqueued | | | | item.enqueue_time |
++----------------------------------------+------+------+ +--------------------------------------+
+| Changes enqueued behind | | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Set item configuration | | | | build_set.configured_time |
++----------------------------------------+------+------+ +--------------------------------------+
+| Request files changed (if needed) | | | | |
++----------------------------------------+ +------+ +--------------------------------------+
+| Request merge | [2] | | | |
++----------------------------------------+ +------+ +--------------------------------------+
+| Wait for merge (and files if needed) | | | [9] | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Generate dynamic layout (if needed) | [3] | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Freeze job graph | [4] | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Request global repo state (if needed) | | | | build_set.repo_state_request_time |
++----------------------------------------+ [5] +------+ +--------------------------------------+
+| Wait for global repo state (if needed) | | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Deduplicate jobs | | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Acquire semaphore (non-resources-first)| | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Request nodes | | | | request.created_time |
++----------------------------------------+ [6] +------+ +--------------------------------------+
+| Wait for nodes | | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Acquire semaphore (resources-first) | | | | |
++----------------------------------------+------+------+ +--------------------------------------+
+| Enqueue build request | | | | build.execute_time |
++----------------------------------------+ [7] +------+ +--------------------------------------+
+| Executor starts job | | | | build.start_time |
++----------------------------------------+------+------+------+--------------------------------------+
+
+====== =============================
+Metric Name
+====== =============================
+1 event_enqueue_processing_time
+2 merge_request_time
+3 layout_generation_time
+4 job_freeze_time
+5 repo_state_time
+6 node_request_time
+7 job_wait_time
+8 event_enqueue_time
+9 event_job_time
+====== =============================
diff --git a/doc/source/drivers/timer.rst b/doc/source/drivers/timer.rst
index ff50b10ba..1d7931c5e 100644
--- a/doc/source/drivers/timer.rst
+++ b/doc/source/drivers/timer.rst
@@ -14,9 +14,9 @@ Timers don't require a special connection or driver. Instead they can
simply be used by listing ``timer`` as the trigger.
This trigger will run based on a cron-style time specification. It
-will enqueue an event into its pipeline for every project defined in
-the configuration. Any job associated with the pipeline will run in
-response to that event.
+will enqueue an event into its pipeline for every project and branch
+defined in the configuration. Any job associated with the pipeline
+will run in response to that event.
.. attr:: pipeline.trigger.timer
@@ -27,9 +27,9 @@ response to that event.
The time specification in cron syntax. Only the 5 part syntax
is supported, not the symbolic names. Example: ``0 0 * * *``
- runs at midnight. The first weekday is Monday.
- An optional 6th part specifies seconds. The optional 7th part
- specifies a jitter in seconds. This advances or delays the
- trigger randomly, limited by the specified value.
- Example ``0 0 * * * * 60`` runs at midnight with a +/- 60
- seconds jitter.
+ runs at midnight. The first weekday is Monday. An optional 6th
+ part specifies seconds. The optional 7th part specifies a
+ jitter in seconds. This delays the trigger randomly, limited by
+ the specified value. Example ``0 0 * * * * 60`` runs at
+ midnight or randomly up to 60 seconds later. The jitter is
+ applied individually to each project-branch combination.
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index 0c2cb4351..1cb61ee01 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -110,7 +110,27 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
operation(s). This will always include a request to a Zuul
merger to speculatively merge the change, but it may also
include a second request submitted in parallel to identify
- the files altered by the change.
+ the files altered by the change. Includes
+ :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_merge_op_time`
+ and
+ :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_files_changes_op_time`.
+
+ .. stat:: merger_merge_op_time
+ :type: timer
+
+ The amount of time the merger spent performing a merge
+ operation. This does not include any of the round-trip time
+ from the scheduler to the merger, or any other merge
+ operations.
+
+ .. stat:: merger_files_changes_op_time
+ :type: timer
+
+ The amount of time the merger spent performing a files-changes
+ operation to detect changed files (this is sometimes
+ performed if the source does not provide this information).
+ This does not include any of the round-trip time from the
+ scheduler to the merger, or any other merge operations.
.. stat:: layout_generation_time
:type: timer
@@ -128,7 +148,17 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The amount of time waiting for a secondary Zuul merger
operation to collect additional information about the repo
- state of required projects.
+ state of required projects. Includes
+ :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.merger_repo_state_op_time`.
+
+ .. stat:: merger_repo_state_op_time
+ :type: timer
+
+ The amount of time the merger spent performing a repo state
+ operation to collect additional information about the repo
+ state of required projects. This does not include any of the
+ round-trip time from the scheduler to the merger, or any
+ other merge operations.
.. stat:: node_request_time
:type: timer
diff --git a/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml b/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml
new file mode 100644
index 000000000..d209f4a87
--- /dev/null
+++ b/releasenotes/notes/timer-jitter-3d3df10d0e75f892.yaml
@@ -0,0 +1,7 @@
+---
+features:
+ - |
+ Pipeline timer triggers with jitter now apply the jitter to each
+ project-branch individually (instead of to the pipeline as a
+ whole). This can reduce the thundering herd effect on external
+ systems for periodic pipelines with many similar jobs.
diff --git a/tests/base.py b/tests/base.py
index 5a85ea0d7..cebcf2e1f 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -3091,6 +3091,8 @@ class FakeBuild(object):
self.paused = False
self.aborted = False
self.requeue = False
+ self.should_fail = False
+ self.should_retry = False
self.created = time.time()
self.changes = None
items = self.parameters['zuul']['items']
@@ -3162,6 +3164,8 @@ class FakeBuild(object):
return result
def shouldFail(self):
+ if self.should_fail:
+ return True
changes = self.executor_server.fail_tests.get(self.name, [])
for change in changes:
if self.hasChanges(change):
@@ -3169,6 +3173,8 @@ class FakeBuild(object):
return False
def shouldRetry(self):
+ if self.should_retry:
+ return True
entries = self.executor_server.retry_tests.get(self.name, [])
for entry in entries:
if self.hasChanges(entry['change']):
@@ -3662,7 +3668,7 @@ class FakeSMTP(object):
class FakeNodepool(object):
REQUEST_ROOT = '/nodepool/requests'
NODE_ROOT = '/nodepool/nodes'
- LAUNCHER_ROOT = '/nodepool/launchers'
+ COMPONENT_ROOT = '/nodepool/components'
log = logging.getLogger("zuul.test.FakeNodepool")
@@ -3726,10 +3732,11 @@ class FakeNodepool(object):
self.fulfillRequest(req)
def registerLauncher(self, labels=["label1"], id="FakeLauncher"):
- path = os.path.join(self.LAUNCHER_ROOT, id)
+ path = os.path.join(self.COMPONENT_ROOT, 'pool', id)
data = {'id': id, 'supported_labels': labels}
self.client.create(
- path, json.dumps(data).encode('utf8'), makepath=True)
+ path, json.dumps(data).encode('utf8'),
+ ephemeral=True, makepath=True, sequence=True)
def getNodeRequests(self):
try:
diff --git a/tests/fixtures/layouts/job-dedup-false.yaml b/tests/fixtures/layouts/job-dedup-false.yaml
index 2c0e6ee2e..9254f0b41 100644
--- a/tests/fixtures/layouts/job-dedup-false.yaml
+++ b/tests/fixtures/layouts/job-dedup-false.yaml
@@ -39,6 +39,7 @@
- job:
name: common-job
deduplicate: false
+ pre-run: playbooks/pre.yaml
required-projects:
- org/project1
- org/project2
diff --git a/tests/fixtures/layouts/job-dedup-noop.yaml b/tests/fixtures/layouts/job-dedup-noop.yaml
new file mode 100644
index 000000000..9383fd8b6
--- /dev/null
+++ b/tests/fixtures/layouts/job-dedup-noop.yaml
@@ -0,0 +1,55 @@
+- queue:
+ name: integrated
+ allow-circular-dependencies: true
+
+- pipeline:
+ name: gate
+ manager: dependent
+ success-message: Build succeeded (gate).
+ require:
+ gerrit:
+ approval:
+ - Approved: 1
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - Approved: 1
+ success:
+ gerrit:
+ Verified: 2
+ submit: true
+ failure:
+ gerrit:
+ Verified: -2
+ start:
+ gerrit:
+ Verified: 0
+ precedence: high
+
+- job:
+ name: base
+ parent: null
+ pre-run: playbooks/pre.yaml
+ run: playbooks/run.yaml
+ nodeset:
+ nodes:
+ - label: debian
+ name: controller
+
+- job:
+ name: common-job
+ required-projects:
+ - org/project1
+
+- job:
+ name: project1-job
+
+- project:
+ name: org/project1
+ queue: integrated
+ gate:
+ jobs:
+ - noop
+ - common-job
+ - project1-job
diff --git a/tests/fixtures/layouts/trigger-sequence.yaml b/tests/fixtures/layouts/trigger-sequence.yaml
new file mode 100644
index 000000000..31db734b1
--- /dev/null
+++ b/tests/fixtures/layouts/trigger-sequence.yaml
@@ -0,0 +1,75 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- pipeline:
+ name: gate
+ manager: dependent
+ success-message: Build succeeded (gate).
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - Approved: 1
+ success:
+ gerrit:
+ Verified: 2
+ submit: true
+ failure:
+ gerrit:
+ Verified: -2
+ start:
+ gerrit:
+ Verified: 0
+ precedence: high
+
+- pipeline:
+ name: post
+ manager: independent
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^(?!refs/).*$
+
+- pipeline:
+ name: tag
+ manager: independent
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^refs/tags/.*$
+
+- job:
+ name: base
+ parent: null
+ run: playbooks/base.yaml
+ nodeset:
+ nodes:
+ - label: ubuntu-xenial
+ name: controller
+
+- job:
+ name: check-job
+ run: playbooks/check.yaml
+
+- job:
+ name: post-job
+ run: playbooks/post.yaml
+
+- project:
+ name: org/project
+ check:
+ jobs:
+ - check-job
+ gate:
+ jobs:
+ - check-job
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index ac5ad13f5..2223008d5 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -1703,6 +1703,151 @@ class TestGerritCircularDependencies(ZuulTestCase):
], ordered=False)
self.assertEqual(len(self.fake_nodepool.history), 3)
+ @simple_layout('layouts/job-dedup-false.yaml')
+ def test_job_deduplication_false_failed_job(self):
+ # Test that if we are *not* deduplicating jobs, we don't
+ # duplicate the result on two different builds.
+ # The way we check that is to retry the common-job between two
+ # items, but only once, and only on one item. The other item
+ # should be unaffected.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+
+ # A <-> B
+ A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.data["url"]
+ )
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.data["url"]
+ )
+
+ A.addApproval('Code-Review', 2)
+ B.addApproval('Code-Review', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+
+ # If we don't make sure these jobs finish first, then one of
+ # the items may complete before the other and cause Zuul to
+ # abort the project*-job on the other item (with a "bundle
+ # failed to merge" error).
+ self.waitUntilSettled()
+ for build in self.builds:
+ if build.name == 'common-job' and build.project == 'org/project1':
+ break
+ else:
+ raise Exception("Unable to find build")
+ build.should_retry = True
+
+ # Store a reference to the queue items so we can inspect their
+ # internal attributes later to double check the retry build
+ # count is correct.
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ pipeline = tenant.layout.pipelines['gate']
+ items = pipeline.getAllItems()
+ self.assertEqual(len(items), 2)
+
+ self.executor_server.release('project1-job')
+ self.executor_server.release('project2-job')
+ self.waitUntilSettled()
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertHistory([
+ dict(name="project2-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="common-job", result=None, changes="2,1 1,1"),
+ dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
+ ], ordered=False)
+ self.assertEqual(len(self.fake_nodepool.history), 5)
+ self.assertEqual(items[0].change.project.name, 'org/project2')
+ self.assertEqual(len(items[0].current_build_set.retry_builds), 0)
+ self.assertEqual(items[1].change.project.name, 'org/project1')
+ self.assertEqual(len(items[1].current_build_set.retry_builds), 1)
+
+ @simple_layout('layouts/job-dedup-auto-shared.yaml')
+ def test_job_deduplication_multi_scheduler(self):
+ # Test that a second scheduler can correctly refresh
+ # deduplicated builds
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+
+ # A <-> B
+ A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.data["url"]
+ )
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.data["url"]
+ )
+
+ A.addApproval('Code-Review', 2)
+ B.addApproval('Code-Review', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+
+ self.waitUntilSettled()
+
+ app = self.createScheduler()
+ app.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ # Hold the lock on the first scheduler so that only the second
+ # will act.
+ with self.scheds.first.sched.run_handler_lock:
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled(matcher=[app])
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertHistory([
+ dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
+ ], ordered=False)
+
+ @simple_layout('layouts/job-dedup-noop.yaml')
+ def test_job_deduplication_noop(self):
+ # Test that we don't deduplicate noop (there's no good reason
+ # to do so)
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+
+ # A <-> B
+ A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.data["url"]
+ )
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.data["url"]
+ )
+
+ A.addApproval('Code-Review', 2)
+ B.addApproval('Code-Review', 2)
+
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertHistory([
+ dict(name="project1-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
+ ], ordered=False)
+ # It's tricky to get info about a noop build, but the jobs in
+ # the report have the build UUID, so we make sure it's
+ # different.
+ a_noop = [l for l in A.messages[-1].split('\n') if 'noop' in l][0]
+ b_noop = [l for l in B.messages[-1].split('\n') if 'noop' in l][0]
+ self.assertNotEqual(a_noop, b_noop)
+
@simple_layout('layouts/job-dedup-retry.yaml')
def test_job_deduplication_retry(self):
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index f0f9027bd..aa2bb1758 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -699,6 +699,7 @@ class TestPolling(ZuulTestCase):
files=file_dict)
A.setMerged()
self.waitForPoll('gerrit')
+ self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.setCheck('zuul:check', reset=True)
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index 1bfba36bb..49dae0ccb 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -38,7 +38,7 @@ from tests.base import (AnsibleZuulTestCase, BaseTestCase,
simple_layout, random_sha1)
from tests.base import ZuulWebFixture
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestGithubDriver(ZuulTestCase):
@@ -1384,7 +1384,7 @@ class TestGithubDriver(ZuulTestCase):
# now check if the merge was done via rebase
merges = [report for report in self.fake_github.github_data.reports
if report[2] == 'merge']
- assert(len(merges) == 1 and merges[0][3] == 'squash')
+ assert (len(merges) == 1 and merges[0][3] == 'squash')
@simple_layout('layouts/basic-github.yaml', driver='github')
def test_invalid_event(self):
diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py
index 2715cdef1..6c4d4eeb9 100644
--- a/tests/unit/test_gitlab_driver.py
+++ b/tests/unit/test_gitlab_driver.py
@@ -28,7 +28,7 @@ from tests.base import ZuulTestCase, ZuulWebFixture
from testtools.matchers import MatchesRegex
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestGitlabWebhook(ZuulTestCase):
diff --git a/tests/unit/test_pagure_driver.py b/tests/unit/test_pagure_driver.py
index 32635a389..92159cc9f 100644
--- a/tests/unit/test_pagure_driver.py
+++ b/tests/unit/test_pagure_driver.py
@@ -26,7 +26,7 @@ from zuul.zk.layout import LayoutState
from tests.base import ZuulTestCase, simple_layout
from tests.base import ZuulWebFixture
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestPagureDriver(ZuulTestCase):
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c6865a8d7..5e0385be3 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -50,8 +50,9 @@ from tests.base import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.layout import LayoutState
+from zuul.zk.locks import management_queue_lock
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestSchedulerSSL(SSLZuulTestCase):
@@ -451,6 +452,7 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.reconfiguration_time',
'zuul.tenant.tenant-one.pipeline.gate.event_enqueue_time',
'zuul.tenant.tenant-one.pipeline.gate.merge_request_time',
+ 'zuul.tenant.tenant-one.pipeline.gate.merger_merge_op_time',
'zuul.tenant.tenant-one.pipeline.gate.job_freeze_time',
'zuul.tenant.tenant-one.pipeline.gate.node_request_time',
'zuul.tenant.tenant-one.pipeline.gate.job_wait_time',
@@ -4214,6 +4216,56 @@ class TestScheduler(ZuulTestCase):
dict(name='check-job', result='SUCCESS', changes='1,1'),
])
+ @simple_layout('layouts/trigger-sequence.yaml')
+ def test_live_reconfiguration_trigger_sequence(self):
+ # Test that events arriving after an event that triggers a
+ # reconfiguration are handled after the reconfiguration
+ # completes.
+
+ in_repo_conf = "[{project: {tag: {jobs: [post-job]}}}]"
+ file_dict = {'zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ sched = self.scheds.first.sched
+ # Hold the management queue so that we don't process any
+ # reconfiguration events yet.
+ with management_queue_lock(
+ self.zk_client, 'tenant-one', blocking=False
+ ):
+ with sched.run_handler_lock:
+ A.setMerged()
+ # Submit two events while no processing is happening:
+ # A change merged event that will trigger a reconfiguration
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+
+ # And a tag event which should only run a job after
+ # the config change above is in effect.
+ event = self.fake_gerrit.addFakeTag(
+ 'org/project', 'master', 'foo')
+ self.fake_gerrit.addEvent(event)
+
+ # Wait for the tenant trigger queue to empty out, and for
+ # us to have a tenant management as well as a pipeline
+ # trigger event. At this point, we should be deferring
+ # the trigger event until the management event is handled.
+ for _ in iterate_timeout(60, 'queues'):
+ with sched.run_handler_lock:
+ if sched.trigger_events['tenant-one'].hasEvents():
+ continue
+ if not sched.pipeline_trigger_events[
+ 'tenant-one']['tag'].hasEvents():
+ continue
+ if not sched.management_events['tenant-one'].hasEvents():
+ continue
+ break
+
+ # Now we can resume and process the reconfiguration event
+ sched.wake_event.set()
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='post-job', result='SUCCESS'),
+ ])
+
@simple_layout('layouts/repo-deleted.yaml')
def test_repo_deleted(self):
self.init_repo("org/delete-project")
@@ -5680,8 +5732,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.reported, 2)
self.assertTrue(re.search('project-merge .* NODE_FAILURE',
A.messages[1]))
- self.assertTrue(re.search('project-test1 .* SKIPPED', A.messages[1]))
- self.assertTrue(re.search('project-test2 .* SKIPPED', A.messages[1]))
+ self.assertTrue('Skipped 2 jobs' in A.messages[1])
def test_nodepool_resources(self):
"Test that resources are reported"
@@ -6803,7 +6854,7 @@ class TestDependencyGraph(ZuulTestCase):
self.assertHistory([
dict(name='build', result='FAILURE', changes='1,1'),
], ordered=False)
- self.assertIn('SKIPPED', A.messages[0])
+ self.assertIn('Skipped 1 job', A.messages[0])
class TestDuplicatePipeline(ZuulTestCase):
diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py
index 7d95f2548..f371a8064 100644
--- a/tests/unit/test_sos.py
+++ b/tests/unit/test_sos.py
@@ -55,6 +55,79 @@ class TestScaleOutScheduler(ZuulTestCase):
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
+ def test_pipeline_cache_clear(self):
+ # Test that the pipeline cache on a second scheduler isn't
+ # holding old change objects.
+
+ # Hold jobs in build
+ sched1 = self.scheds.first
+ self.executor_server.hold_jobs_in_build = True
+
+ # We need a pair of changes in order to populate the pipeline
+ # change cache (a single change doesn't activate the cache,
+ # it's for dependencies).
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.addApproval('Code-Review', 2)
+ B.addApproval('Approved', 1)
+ B.setDependsOn(A, 1)
+
+ # Fail a job
+ self.executor_server.failJob('project-test1', A)
+
+ # Enqueue into gate with scheduler 1
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ # Start scheduler 2
+ sched2 = self.createScheduler()
+ sched2.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ # Pause scheduler 1
+ with sched1.sched.run_handler_lock:
+ # Release jobs
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ # Wait for scheduler 2 to dequeue
+ self.waitUntilSettled(matcher=[sched2])
+ # Unpause scheduler 1
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ # Clear zk change cache
+ self.fake_gerrit._change_cache.prune([], max_age=0)
+
+ # At this point, scheduler 1 should have a bogus change entry
+ # in the pipeline cache because scheduler 2 performed the
+ # dequeue so scheduler 1 never cleaned up its cache.
+
+ self.executor_server.fail_tests.clear()
+ self.executor_server.hold_jobs_in_build = True
+ # Pause scheduler 1
+ with sched1.sched.run_handler_lock:
+ # Enqueue into gate with scheduler 2
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled(matcher=[sched2])
+
+ # Pause scheduler 2
+ with sched2.sched.run_handler_lock:
+ # Make sure that scheduler 1 does some pipeline runs which
+ # reconstitute state from ZK. This gives it the
+ # opportunity to use old cache data if we don't clear it.
+
+ # Release job1
+ self.executor_server.release()
+ self.waitUntilSettled(matcher=[sched1])
+ # Release job2
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ # Wait for scheduler 1 to merge change
+ self.waitUntilSettled(matcher=[sched1])
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+
@simple_layout('layouts/multi-scheduler-status.yaml')
def test_multi_scheduler_status(self):
self.hold_merge_jobs_in_queue = True
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 9fd8ea1d3..a89bb3007 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -4930,7 +4930,7 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn(
'- data-return https://zuul.example.com/',
A.messages[-1])
- self.assertTrue(re.search('child .* SKIPPED', A.messages[-1]))
+ self.assertTrue('Skipped 1 job' in A.messages[-1])
self.assertIn('Build succeeded', A.messages[-1])
def test_data_return_invalid_child_job(self):
@@ -4943,7 +4943,7 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn(
'- data-return-invalid-child-job https://zuul.example.com',
A.messages[-1])
- self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1]))
+ self.assertTrue('Skipped 1 job' in A.messages[-1])
self.assertIn('Build succeeded', A.messages[-1])
def test_data_return_skip_all_child_jobs(self):
@@ -4957,8 +4957,7 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn(
'- data-return-skip-all https://zuul.example.com/',
A.messages[-1])
- self.assertTrue(re.search('child .* SKIPPED', A.messages[-1]))
- self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1]))
+ self.assertTrue('Skipped 2 jobs' in A.messages[-1])
self.assertIn('Build succeeded', A.messages[-1])
def test_data_return_skip_all_child_jobs_with_soft_dependencies(self):
@@ -4972,8 +4971,7 @@ class TestDataReturn(AnsibleZuulTestCase):
])
self.assertIn('- data-return-cd https://zuul.example.com/',
A.messages[-1])
- self.assertTrue(re.search('data-return-a .* SKIPPED', A.messages[-1]))
- self.assertTrue(re.search('data-return-b .* SKIPPED', A.messages[-1]))
+ self.assertTrue('Skipped 2 jobs' in A.messages[-1])
self.assertIn('Build succeeded', A.messages[-1])
def test_several_zuul_return(self):
@@ -4987,7 +4985,7 @@ class TestDataReturn(AnsibleZuulTestCase):
self.assertIn(
'- several-zuul-return-child https://zuul.example.com/',
A.messages[-1])
- self.assertTrue(re.search('data-return .* SKIPPED', A.messages[-1]))
+ self.assertTrue('Skipped 1 job' in A.messages[-1])
self.assertIn('Build succeeded', A.messages[-1])
def test_data_return_skip_retry(self):
@@ -6905,7 +6903,7 @@ class TestJobPause(AnsibleZuulTestCase):
dict(name='compile', result='SUCCESS', changes='1,1'),
])
- self.assertTrue(re.search('test .* SKIPPED', A.messages[0]))
+ self.assertTrue('Skipped 1 job' in A.messages[0])
def test_job_pause_pre_skipped_child(self):
"""
@@ -6953,7 +6951,7 @@ class TestJobPause(AnsibleZuulTestCase):
dict(name='compile', result='SUCCESS', changes='1,1'),
])
- self.assertTrue(re.search('test .* SKIPPED', A.messages[0]))
+ self.assertTrue('Skipped 1 job' in A.messages[0])
def test_job_pause_skipped_child_retry(self):
"""
@@ -7822,7 +7820,7 @@ class TestProvidesRequiresMysql(ZuulTestCase):
dict(name='image-builder', result='FAILURE', changes='1,1'),
dict(name='hold', result='SUCCESS', changes='1,1'),
], ordered=False)
- self.assertTrue(re.search('image-user .* SKIPPED', A.messages[0]))
+ self.assertTrue('Skipped 1 job' in A.messages[0])
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index eadf5c855..b1f393e47 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -1290,7 +1290,7 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
"github": 456,
}
state = LayoutState("tenant", "hostname", 0, layout_uuid,
- branch_cache_min_ltimes)
+ branch_cache_min_ltimes, -1)
store["tenant"] = state
self.assertEqual(state, store["tenant"])
self.assertNotEqual(state.ltime, -1)
@@ -1301,9 +1301,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
def test_ordering(self):
layout_uuid = uuid.uuid4().hex
state_one = LayoutState("tenant", "hostname", 1, layout_uuid,
- {}, ltime=1)
+ {}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, layout_uuid,
- {}, ltime=2)
+ {}, -1, ltime=2)
self.assertGreater(state_two, state_one)
@@ -1312,9 +1312,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
min_ltimes = defaultdict(lambda x: -1)
min_ltimes['foo'] = 1
state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex,
- {}, ltime=1)
+ {}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex,
- {}, ltime=2)
+ {}, -1, ltime=2)
store.setMinLtimes(state_one, min_ltimes)
store.setMinLtimes(state_two, min_ltimes)
store['tenant'] = state_one
diff --git a/web/src/containers/status/ChangePanel.jsx b/web/src/containers/status/ChangePanel.jsx
index 33fc4687c..dd4fc27e5 100644
--- a/web/src/containers/status/ChangePanel.jsx
+++ b/web/src/containers/status/ChangePanel.jsx
@@ -18,6 +18,7 @@ import { connect } from 'react-redux'
import { Link } from 'react-router-dom'
import * as moment from 'moment'
import 'moment-duration-format'
+import { Button } from '@patternfly/react-core'
class ChangePanel extends React.Component {
@@ -30,9 +31,11 @@ class ChangePanel extends React.Component {
constructor () {
super()
this.state = {
- expanded: false
+ expanded: false,
+ showSkipped: false,
}
this.onClick = this.onClick.bind(this)
+ this.toggleSkippedJobs = this.toggleSkippedJobs.bind(this)
this.clicked = false
}
@@ -120,12 +123,13 @@ class ChangePanel extends React.Component {
}
renderProgressBar (change) {
- let jobPercent = (100 / change.jobs.length).toFixed(2)
+ const interesting_jobs = change.jobs.filter(j => this.jobStrResult(j) !== 'skipped')
+ let jobPercent = (100 / interesting_jobs.length).toFixed(2)
return (
<div className='progress zuul-change-total-result'>
{change.jobs.map((job, idx) => {
let result = this.jobStrResult(job)
- if (['queued', 'waiting'].includes(result)) {
+ if (['queued', 'waiting', 'skipped'].includes(result)) {
return ''
}
let className = ''
@@ -144,7 +148,6 @@ class ChangePanel extends React.Component {
className = ' progress-bar-warning'
break
case 'paused':
- case 'skipped':
className = ' progress-bar-info'
break
default:
@@ -302,15 +305,39 @@ class ChangePanel extends React.Component {
</span>)
}
+ toggleSkippedJobs (e) {
+ // Skip middle mouse button
+ if (e.button === 1) {
+ return
+ }
+ this.setState({ showSkipped: !this.state.showSkipped })
+ }
+
renderJobList (jobs, times) {
+ const [buttonText, interestingJobs] = this.state.showSkipped ?
+ ['Hide', jobs] :
+ ['Show', jobs.filter(j => this.jobStrResult(j) !== 'skipped')]
+ const skippedJobCount = jobs.length - interestingJobs.length
+
return (
- <ul className='list-group zuul-patchset-body'>
- {jobs.map((job, idx) => (
- <li key={idx} className='list-group-item zuul-change-job'>
- {this.renderJob(job, times.jobs[job.name])}
- </li>
- ))}
- </ul>)
+ <>
+ <ul className='list-group zuul-patchset-body'>
+ {interestingJobs.map((job, idx) => (
+ <li key={idx} className='list-group-item zuul-change-job'>
+ {this.renderJob(job, times.jobs[job.name])}
+ </li>
+ ))}
+ {(this.state.showSkipped || skippedJobCount) ? (
+ <li key='last' className='list-group-item zuul-change-job'>
+ <Button variant="link" className='zuul-skipped-jobs-button'
+ onClick={this.toggleSkippedJobs}>
+ {buttonText} {skippedJobCount ? skippedJobCount : ''} skipped job{skippedJobCount === 1 ? '' : 's'}
+ </Button>
+ </li>
+ ) : ''}
+ </ul>
+ </>
+ )
}
calculateTimes (change) {
diff --git a/web/src/containers/status/ChangePanel.test.jsx b/web/src/containers/status/ChangePanel.test.jsx
index 5a4be8602..cd27edc73 100644
--- a/web/src/containers/status/ChangePanel.test.jsx
+++ b/web/src/containers/status/ChangePanel.test.jsx
@@ -16,6 +16,7 @@ import React from 'react'
import { Link, BrowserRouter as Router } from 'react-router-dom'
import { Provider } from 'react-redux'
import { create } from 'react-test-renderer'
+import { Button } from '@patternfly/react-core'
import { setTenantAction } from '../../actions/tenant'
import configureStore from '../../store'
@@ -45,6 +46,8 @@ it('change panel render multi tenant links', () => {
const jobLink = application.root.findByType(Link)
expect(jobLink.props.to).toEqual(
'/t/tenant-one/stream/42')
+ const skipButton = application.root.findAllByType(Button)
+ expect(skipButton === undefined)
})
it('change panel render white-label tenant links', () => {
@@ -60,4 +63,29 @@ it('change panel render white-label tenant links', () => {
const jobLink = application.root.findByType(Link)
expect(jobLink.props.to).toEqual(
'/stream/42')
+ const skipButton = application.root.findAllByType(Button)
+ expect(skipButton === undefined)
+})
+
+it('change panel skip jobs', () => {
+ const fakeChange = {
+ project: 'org-project',
+ jobs: [{
+ name: 'job-name',
+ url: 'stream/42',
+ result: 'skipped'
+ }]
+ }
+
+ const store = configureStore()
+ store.dispatch(setTenantAction('tenant-one', true))
+ const application = create(
+ <Provider store={store}>
+ <Router>
+ <ChangePanel change={fakeChange} globalExpanded={true} />
+ </Router>
+ </Provider>
+ )
+ const skipButton = application.root.findByType(Button)
+ expect(skipButton.props.children.includes('skipped job'))
})
diff --git a/web/src/index.css b/web/src/index.css
index e8c67a372..eddbca673 100644
--- a/web/src/index.css
+++ b/web/src/index.css
@@ -189,6 +189,11 @@ a.refresh {
font-size: small;
}
+.zuul-skipped-jobs-button {
+ font-size: small;
+ padding: 0;
+}
+
.zuul-non-voting-desc {
font-size: smaller;
}
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 490e47c59..6d6f16968 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -1044,6 +1044,7 @@ class Client(zuul.cmd.ZuulApp):
tenant_name=args.tenant,
hostname='admin command',
last_reconfigured=int(time.time()),
+ last_reconfigure_event_ltime=-1,
uuid=uuid4().hex,
branch_cache_min_ltimes={},
ltime=ps._zstat.last_modified_transaction_id,
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 6aea4388b..1ec334915 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -1470,7 +1470,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# for large projects like nova
alldata = []
chunk, more_changes = _query_chunk(query, event)
- while(chunk):
+ while chunk:
alldata.extend(chunk)
if more_changes is None:
# continue sortKey based (before Gerrit 2.9)
diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py
index de62f2565..73d3a6f13 100644
--- a/zuul/driver/github/githubreporter.py
+++ b/zuul/driver/github/githubreporter.py
@@ -135,9 +135,12 @@ class GithubReporter(BaseReporter):
def _formatItemReportJobs(self, item):
# Return the list of jobs portion of the report
ret = ''
- jobs_fields = self._getItemReportJobsFields(item)
+ jobs_fields, skipped = self._getItemReportJobsFields(item)
for job_fields in jobs_fields:
ret += self._formatJobResult(job_fields)
+ if skipped:
+ jobtext = 'job' if skipped == 1 else 'jobs'
+ ret += 'Skipped %i %s\n' % (skipped, jobtext)
return ret
def addPullComment(self, item, comment=None):
diff --git a/zuul/driver/pagure/pagurereporter.py b/zuul/driver/pagure/pagurereporter.py
index 0bfdbc9b8..b38035752 100644
--- a/zuul/driver/pagure/pagurereporter.py
+++ b/zuul/driver/pagure/pagurereporter.py
@@ -67,9 +67,12 @@ class PagureReporter(BaseReporter):
def _formatItemReportJobs(self, item):
# Return the list of jobs portion of the report
ret = ''
- jobs_fields = self._getItemReportJobsFields(item)
+ jobs_fields, skipped = self._getItemReportJobsFields(item)
for job_fields in jobs_fields:
ret += '- [%s](%s) : %s%s%s%s\n' % job_fields[:6]
+ if skipped:
+ jobtext = 'job' if skipped == 1 else 'jobs'
+ ret += 'Skipped %i %s\n' % (skipped, jobtext)
return ret
def addPullComment(self, item, comment=None):
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py
index 4f11a583b..619b1a8ff 100644
--- a/zuul/driver/timer/__init__.py
+++ b/zuul/driver/timer/__init__.py
@@ -130,6 +130,24 @@ class TimerDriver(Driver, TriggerInterface):
pipeline.name)
continue
+ self._addJobsInner(tenant, pipeline, trigger, timespec,
+ jobs)
+
+ def _addJobsInner(self, tenant, pipeline, trigger, timespec, jobs):
+ # jobs is a list that we mutate
+ for project_name, pcs in tenant.layout.project_configs.items():
+ # timer operates on branch heads and doesn't need
+ # speculative layouts to decide if it should be
+ # enqueued or not. So it can be decided on cached
+ # data if it needs to run or not.
+ pcst = tenant.layout.getAllProjectConfigs(project_name)
+ if not [True for pc in pcst if pipeline.name in pc.pipelines]:
+ continue
+
+ (trusted, project) = tenant.getProject(project_name)
+ try:
+ for branch in project.source.getProjectBranches(
+ project, tenant):
# The 'misfire_grace_time' argument is set to None to
# disable checking if the job missed its run time window.
# This ensures we don't miss a trigger when the job is
@@ -137,11 +155,17 @@ class TimerDriver(Driver, TriggerInterface):
# delays are not a problem for our trigger use-case.
job = self.apsched.add_job(
self._onTrigger, trigger=trigger,
- args=(tenant, pipeline.name, timespec,),
+ args=(tenant, pipeline.name, project_name,
+ branch, timespec,),
misfire_grace_time=None)
jobs.append(job)
+ except Exception:
+ self.log.exception("Unable to create APScheduler job for "
+ "%s %s %s",
+ tenant, pipeline, project)
- def _onTrigger(self, tenant, pipeline_name, timespec):
+ def _onTrigger(self, tenant, pipeline_name, project_name, branch,
+ timespec):
if not self.election_won:
return
@@ -150,55 +174,43 @@ class TimerDriver(Driver, TriggerInterface):
return
try:
- self._dispatchEvent(tenant, pipeline_name, timespec)
+ self._dispatchEvent(tenant, pipeline_name, project_name,
+ branch, timespec)
except Exception:
self.stop_event.set()
self.log.exception("Error when dispatching timer event")
- def _dispatchEvent(self, tenant, pipeline_name, timespec):
- self.log.debug('Got trigger for tenant %s and pipeline %s with '
- 'timespec %s', tenant.name, pipeline_name, timespec)
- for project_name, pcs in tenant.layout.project_configs.items():
- try:
- # timer operates on branch heads and doesn't need
- # speculative layouts to decide if it should be
- # enqueued or not. So it can be decided on cached
- # data if it needs to run or not.
- pcst = tenant.layout.getAllProjectConfigs(project_name)
- if not [True for pc in pcst if pipeline_name in pc.pipelines]:
- continue
-
- (trusted, project) = tenant.getProject(project_name)
- for branch in project.source.getProjectBranches(
- project, tenant):
- try:
- event = TimerTriggerEvent()
- event.type = 'timer'
- event.timespec = timespec
- event.forced_pipeline = pipeline_name
- event.project_hostname = project.canonical_hostname
- event.project_name = project.name
- event.ref = 'refs/heads/%s' % branch
- event.branch = branch
- event.zuul_event_id = str(uuid4().hex)
- event.timestamp = time.time()
- # Refresh the branch in order to update the item in the
- # change cache.
- change_key = project.source.getChangeKey(event)
- with self.project_update_locks[project.canonical_name]:
- project.source.getChange(change_key, refresh=True,
- event=event)
- log = get_annotated_logger(self.log, event)
- log.debug("Adding event")
- self.sched.addTriggerEvent(self.name, event)
- except Exception:
- self.log.exception("Error dispatching timer event for "
- "project %s branch %s",
- project, branch)
- except Exception:
- self.log.exception("Error dispatching timer event for "
- "project %s",
- project)
+ def _dispatchEvent(self, tenant, pipeline_name, project_name,
+ branch, timespec):
+ self.log.debug('Got trigger for tenant %s and pipeline %s '
+ 'project %s branch %s with timespec %s',
+ tenant.name, pipeline_name, project_name,
+ branch, timespec)
+ try:
+ (trusted, project) = tenant.getProject(project_name)
+ event = TimerTriggerEvent()
+ event.type = 'timer'
+ event.timespec = timespec
+ event.forced_pipeline = pipeline_name
+ event.project_hostname = project.canonical_hostname
+ event.project_name = project.name
+ event.ref = 'refs/heads/%s' % branch
+ event.branch = branch
+ event.zuul_event_id = str(uuid4().hex)
+ event.timestamp = time.time()
+ # Refresh the branch in order to update the item in the
+ # change cache.
+ change_key = project.source.getChangeKey(event)
+ with self.project_update_locks[project.canonical_name]:
+ project.source.getChange(change_key, refresh=True,
+ event=event)
+ log = get_annotated_logger(self.log, event)
+ log.debug("Adding event")
+ self.sched.addTriggerEvent(self.name, event)
+ except Exception:
+ self.log.exception("Error dispatching timer event for "
+ "tenant %s project %s branch %s",
+ tenant, project_name, branch)
def stop(self):
self.stopped = True
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 2165a797b..eac7fa7e5 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -3735,7 +3735,7 @@ class ExecutorServer(BaseMergeServer):
sensor.reportStats(self.statsd, base_key)
def finishJob(self, unique):
- del(self.job_workers[unique])
+ del self.job_workers[unique]
self.log.debug(
"Finishing Job: %s, queue(%d): %s",
unique,
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 642aededd..a66f5ad22 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -235,6 +235,9 @@ class PipelineManager(metaclass=ABCMeta):
resolved_changes.append(change)
return resolved_changes
+ def clearCache(self):
+ self._change_cache.clear()
+
def _maintainCache(self):
active_layout_uuids = set()
referenced_change_keys = set()
@@ -1768,9 +1771,12 @@ class PipelineManager(metaclass=ABCMeta):
build_in_items = [item]
if item.bundle:
for other_item in item.bundle.items:
- if other_item not in build_in_items:
- if other_item.current_build_set.getBuild(build.job.name):
- build_in_items.append(other_item)
+ if other_item in build_in_items:
+ continue
+ other_build = other_item.current_build_set.getBuild(
+ build.job.name)
+ if other_build is not None and other_build is build:
+ build_in_items.append(other_item)
for item in build_in_items:
# We don't care about some actions below if this build
# isn't in the current buildset, so determine that before
@@ -1817,18 +1823,27 @@ class PipelineManager(metaclass=ABCMeta):
# We're the second of the files/merger pair, report the stat
self.reportPipelineTiming('merge_request_time',
build_set.configured_time)
+ if event.elapsed_time:
+ self.reportPipelineTiming('merger_files_changes_op_time',
+ event.elapsed_time, elapsed=True)
def onMergeCompleted(self, event, build_set):
if build_set.merge_state == build_set.COMPLETE:
self._onGlobalRepoStateCompleted(event, build_set)
self.reportPipelineTiming('repo_state_time',
build_set.repo_state_request_time)
+ if event.elapsed_time:
+ self.reportPipelineTiming('merger_repo_state_op_time',
+ event.elapsed_time, elapsed=True)
else:
self._onMergeCompleted(event, build_set)
if build_set.files_state == build_set.COMPLETE:
# We're the second of the files/merger pair, report the stat
self.reportPipelineTiming('merge_request_time',
build_set.configured_time)
+ if event.elapsed_time:
+ self.reportPipelineTiming('merger_merge_op_time',
+ event.elapsed_time, elapsed=True)
def _onMergeCompleted(self, event, build_set):
@@ -2120,7 +2135,7 @@ class PipelineManager(metaclass=ABCMeta):
except Exception:
self.log.exception("Exception reporting pipeline stats")
- def reportPipelineTiming(self, key, start, end=None):
+ def reportPipelineTiming(self, key, start, end=None, elapsed=False):
if not self.sched.statsd:
return
if not start:
@@ -2130,5 +2145,8 @@ class PipelineManager(metaclass=ABCMeta):
pipeline = self.pipeline
tenant = pipeline.tenant
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
- dt = (end - start) * 1000
+ if elapsed:
+ dt = start
+ else:
+ dt = (end - start) * 1000
self.sched.statsd.timing(f'{stats_key}.{key}', dt)
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 362644b98..29fa39aaf 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -159,7 +159,9 @@ class MergeClient(object):
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
- merge_request.build_set_uuid, files=None
+ merge_request.build_set_uuid,
+ files=None,
+ elapsed_time=None,
)
else:
event = MergeCompletedEvent(
@@ -172,6 +174,7 @@ class MergeClient(object):
repo_state=None,
item_in_branches=None,
errors=None,
+ elapsed_time=None,
)
try:
self.result_events[merge_request.tenant_name][
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 91597714f..fe5b938a1 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -256,6 +256,7 @@ class BaseMergeServer(metaclass=ABCMeta):
def executeMergeJob(self, merge_request, params):
result = None
+ start = time.monotonic()
if merge_request.job_type == MergeRequest.MERGE:
result = self.merge(merge_request, params)
elif merge_request.job_type == MergeRequest.CAT:
@@ -264,6 +265,8 @@ class BaseMergeServer(metaclass=ABCMeta):
result = self.refstate(merge_request, params)
elif merge_request.job_type == MergeRequest.FILES_CHANGES:
result = self.fileschanges(merge_request, params)
+ end = time.monotonic()
+ result['elapsed_time'] = end - start
return result
def cat(self, merge_request, args):
@@ -376,6 +379,7 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches = result.get("item_in_branches", [])
files = result.get("files", {})
errors = result.get("errors", [])
+ elapsed_time = result.get("elapsed_time")
log.info(
"Merge %s complete, merged: %s, updated: %s, commit: %s, "
@@ -407,7 +411,9 @@ class BaseMergeServer(metaclass=ABCMeta):
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
- merge_request.build_set_uuid, files
+ merge_request.build_set_uuid,
+ files,
+ elapsed_time,
)
else:
event = MergeCompletedEvent(
@@ -420,6 +426,7 @@ class BaseMergeServer(metaclass=ABCMeta):
repo_state,
item_in_branches,
errors,
+ elapsed_time,
)
def put_complete_event(log, merge_request, event):
diff --git a/zuul/model.py b/zuul/model.py
index aa814ce6c..963332826 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -694,6 +694,11 @@ class PipelineState(zkobject.ZKObject):
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, raw, context):
+ # We may have old change objects in the pipeline cache, so
+ # make sure they are the same objects we would get from the
+ # source change cache.
+ self.pipeline.manager.clearCache()
+
data = super().deserialize(raw, context)
existing_queues = {
q.getPath(): q for q in self.queues + self.old_queues
@@ -742,8 +747,56 @@ class PipelineState(zkobject.ZKObject):
"queues": queues,
"old_queues": old_queues,
})
+ if context.build_references:
+ self._fixBuildReferences(data, context)
+ context.build_references = False
return data
+ def _fixBuildReferences(self, data, context):
+ # Reconcile duplicate builds; if we find any BuildReference
+ # objects, look up the actual builds and replace
+ log = context.log
+ build_map = {}
+ to_replace_dicts = []
+ to_replace_lists = []
+ for queue in data['queues'] + data['old_queues']:
+ for item in queue.queue:
+ buildset = item.current_build_set
+ for build_job, build in buildset.builds.items():
+ if isinstance(build, BuildReference):
+ to_replace_dicts.append((item,
+ buildset.builds,
+ build_job,
+ build._path))
+ else:
+ build_map[build.getPath()] = build
+ for job_name, build_list in buildset.retry_builds.items():
+ for build in build_list:
+ if isinstance(build, BuildReference):
+ to_replace_lists.append((item,
+ build_list,
+ build,
+ build._path))
+ else:
+ build_map[build.getPath()] = build
+ for (item, build_dict, build_job, build_path) in to_replace_dicts:
+ orig_build = build_map.get(build_path)
+ if orig_build:
+ build_dict[build_job] = orig_build
+ else:
+ log.warning("Unable to find deduplicated build %s for %s",
+ build_path, item)
+ del build_dict[build_job]
+ for (item, build_list, build, build_path) in to_replace_lists:
+ idx = build_list.index(build)
+ orig_build = build_map.get(build_path)
+ if orig_build:
+ build_list[idx] = build_map[build_path]
+ else:
+ log.warning("Unable to find deduplicated build %s for %s",
+ build_path, item)
+ del build_list[idx]
+
def _getKnownItems(self):
items = []
for queue in (*self.old_queues, *self.queues):
@@ -3424,6 +3477,11 @@ class BuildRequest(JobRequest):
)
+class BuildReference:
+ def __init__(self, _path):
+ self._path = _path
+
+
class Build(zkobject.ZKObject):
"""A Build is an instance of a single execution of a Job.
@@ -3852,6 +3910,13 @@ class BuildSet(zkobject.ZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
+ def _isMyBuild(self, build_path):
+ parts = build_path.split('/')
+ buildset_uuid = parts[-5]
+ if buildset_uuid == self.uuid:
+ return True
+ return False
+
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
# Set our UUID so that getPath() returns the correct path for
@@ -3944,8 +4009,12 @@ class BuildSet(zkobject.ZKObject):
if not build.result:
build.refresh(context)
else:
- build = Build.fromZK(
- context, build_path, job=job, build_set=self)
+ if not self._isMyBuild(build_path):
+ build = BuildReference(build_path)
+ context.build_references = True
+ else:
+ build = Build.fromZK(
+ context, build_path, job=job, build_set=self)
builds[job_name] = build
for retry_path in data["retry_builds"].get(job_name, []):
@@ -3954,8 +4023,12 @@ class BuildSet(zkobject.ZKObject):
# Retry builds never change.
pass
else:
- retry_build = Build.fromZK(
- context, retry_path, job=job, build_set=self)
+ if not self._isMyBuild(retry_path):
+ retry_build = BuildReference(retry_path)
+ context.build_references = True
+ else:
+ retry_build = Build.fromZK(
+ context, retry_path, job=job, build_set=self)
retry_builds[job_name].append(retry_build)
data.update({
@@ -5910,6 +5983,7 @@ class TenantReconfigureEvent(ManagementEvent):
self.tenant_name = tenant_name
self.project_branches = set([(project_name, branch_name)])
self.branch_cache_ltimes = {}
+ self.trigger_event_ltime = -1
self.merged_events = []
def __ne__(self, other):
@@ -5931,6 +6005,8 @@ class TenantReconfigureEvent(ManagementEvent):
self.branch_cache_ltimes.get(connection_name, ltime), ltime)
self.zuul_event_ltime = max(self.zuul_event_ltime,
other.zuul_event_ltime)
+ self.trigger_event_ltime = max(self.trigger_event_ltime,
+ other.trigger_event_ltime)
self.merged_events.append(other)
def toDict(self):
@@ -5938,6 +6014,7 @@ class TenantReconfigureEvent(ManagementEvent):
d["tenant_name"] = self.tenant_name
d["project_branches"] = list(self.project_branches)
d["branch_cache_ltimes"] = self.branch_cache_ltimes
+ d["trigger_event_ltime"] = self.trigger_event_ltime
return d
@classmethod
@@ -5953,6 +6030,7 @@ class TenantReconfigureEvent(ManagementEvent):
tuple(pb) for pb in data["project_branches"]
)
event.branch_cache_ltimes = data.get("branch_cache_ltimes", {})
+ event.trigger_event_ltime = data.get("trigger_event_ltime", -1)
return event
@@ -6175,12 +6253,13 @@ class MergeCompletedEvent(ResultEvent):
:arg dict repo_state: The starting repo state before the merge.
:arg list item_in_branches: A list of branches in which the final
commit in the merge list appears (changes without refs).
- :arg list errors: A list of error message strings
+ :arg list errors: A list of error message strings.
+ :arg float elapsed_time: Elapsed time of merge op in seconds.
"""
def __init__(self, request_uuid, build_set_uuid, merged, updated,
commit, files, repo_state, item_in_branches,
- errors):
+ errors, elapsed_time):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.merged = merged
@@ -6190,6 +6269,7 @@ class MergeCompletedEvent(ResultEvent):
self.repo_state = repo_state or {}
self.item_in_branches = item_in_branches or []
self.errors = errors or []
+ self.elapsed_time = elapsed_time
def __repr__(self):
return ('<MergeCompletedEvent job: %s buildset: %s merged: %s '
@@ -6209,6 +6289,7 @@ class MergeCompletedEvent(ResultEvent):
"repo_state": dict(self.repo_state),
"item_in_branches": list(self.item_in_branches),
"errors": list(self.errors),
+ "elapsed_time": self.elapsed_time,
}
@classmethod
@@ -6223,6 +6304,7 @@ class MergeCompletedEvent(ResultEvent):
dict(data.get("repo_state", {})),
list(data.get("item_in_branches", [])),
list(data.get("errors", [])),
+ data.get("elapsed_time"),
)
@@ -6231,16 +6313,19 @@ class FilesChangesCompletedEvent(ResultEvent):
:arg BuildSet build_set: The build_set which is ready.
:arg list files: List of files changed.
+ :arg float elapsed_time: Elapsed time of merge op in seconds.
"""
- def __init__(self, build_set_uuid, files):
+ def __init__(self, build_set_uuid, files, elapsed_time):
self.build_set_uuid = build_set_uuid
self.files = files or []
+ self.elapsed_time = elapsed_time
def toDict(self):
return {
"build_set_uuid": self.build_set_uuid,
"files": list(self.files),
+ "elapsed_time": self.elapsed_time,
}
@classmethod
@@ -6248,6 +6333,7 @@ class FilesChangesCompletedEvent(ResultEvent):
return cls(
data.get("build_set_uuid"),
list(data.get("files", [])),
+ data.get("elapsed_time"),
)
@@ -6289,6 +6375,9 @@ class TriggerEvent(AbstractEvent):
self.branch_deleted = False
self.branch_protected = True
self.ref = None
+ # For reconfiguration sequencing
+ self.min_reconfigure_ltime = -1
+ self.zuul_event_ltime = None
# For management events (eg: enqueue / promote)
self.tenant_name = None
self.project_hostname = None
@@ -6326,6 +6415,8 @@ class TriggerEvent(AbstractEvent):
"branch_deleted": self.branch_deleted,
"branch_protected": self.branch_protected,
"ref": self.ref,
+ "min_reconfigure_ltime": self.min_reconfigure_ltime,
+ "zuul_event_ltime": self.zuul_event_ltime,
"tenant_name": self.tenant_name,
"project_hostname": self.project_hostname,
"project_name": self.project_name,
@@ -6358,6 +6449,8 @@ class TriggerEvent(AbstractEvent):
self.branch_deleted = d["branch_deleted"]
self.branch_protected = d["branch_protected"]
self.ref = d["ref"]
+ self.min_reconfigure_ltime = d.get("min_reconfigure_ltime", -1)
+ self.zuul_event_ltime = d.get("zuul_event_ltime", None)
self.tenant_name = d["tenant_name"]
self.project_hostname = d["project_hostname"]
self.project_name = d["project_name"]
@@ -7066,6 +7159,7 @@ class Layout(object):
noop = Job('noop')
noop.description = 'A job that will always succeed, no operation.'
noop.parent = noop.BASE_JOB_MARKER
+ noop.deduplicate = False
noop.run = (PlaybookContext(None, 'noop.yaml', [], []),)
self.jobs = {'noop': [noop]}
self.nodesets = {}
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index 9b8f2c11c..5723316a3 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -257,9 +257,13 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
# Extract the report elements from an item
config = self.connection.sched.config
jobs_fields = []
+ skipped = 0
for job in item.getJobs():
build = item.current_build_set.getBuild(job.name)
(result, url) = item.formatJobResult(job)
+ if result == 'SKIPPED':
+ skipped += 1
+ continue
if not job.voting:
voting = ' (non-voting)'
else:
@@ -300,12 +304,15 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
success_message = job.success_message
jobs_fields.append(
(name, url, result, error, elapsed, voting, success_message))
- return jobs_fields
+ return jobs_fields, skipped
def _formatItemReportJobs(self, item):
# Return the list of jobs portion of the report
ret = ''
- jobs_fields = self._getItemReportJobsFields(item)
+ jobs_fields, skipped = self._getItemReportJobsFields(item)
for job_fields in jobs_fields:
ret += '- %s%s : %s%s%s%s\n' % job_fields[:6]
+ if skipped:
+ jobtext = 'job' if skipped == 1 else 'jobs'
+ ret += 'Skipped %i %s\n' % (skipped, jobtext)
return ret
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index b436c356f..272235757 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -860,9 +860,14 @@ class Scheduler(threading.Thread):
self.log.exception("Exception reporting runtime stats")
def reconfigureTenant(self, tenant, project, trigger_event):
+ if trigger_event:
+ trigger_event_ltime = trigger_event.zuul_event_ltime
+ else:
+ trigger_event_ltime = None
self.log.debug("Submitting tenant reconfiguration event for "
- "%s due to event %s in project %s",
- tenant.name, trigger_event, project)
+ "%s due to event %s in project %s, ltime %s",
+ tenant.name, trigger_event, project,
+ trigger_event_ltime)
branch = trigger_event and trigger_event.branch
event = TenantReconfigureEvent(
tenant.name, project.canonical_name, branch,
@@ -870,6 +875,7 @@ class Scheduler(threading.Thread):
if trigger_event:
event.branch_cache_ltimes[trigger_event.connection_name] = (
trigger_event.branch_cache_ltime)
+ event.trigger_event_ltime = trigger_event_ltime
self.management_events[tenant.name].put(event, needs_result=False)
def fullReconfigureCommandHandler(self):
@@ -970,7 +976,7 @@ class Scheduler(threading.Thread):
if layout_state is None:
# Reconfigure only tenants w/o an existing layout state
ctx = self.createZKContext(tlock, self.log)
- self._reconfigureTenant(ctx, min_ltimes, tenant)
+ self._reconfigureTenant(ctx, min_ltimes, -1, tenant)
self._reportInitialStats(tenant)
else:
self.local_layout_state[tenant_name] = layout_state
@@ -1422,6 +1428,7 @@ class Scheduler(threading.Thread):
ctx = self.createZKContext(lock, self.log)
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
+ -1,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
@@ -1485,6 +1492,7 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants[event.tenant_name]
ctx = self.createZKContext(lock, self.log)
self._reconfigureTenant(ctx, min_ltimes,
+ event.trigger_event_ltime,
tenant, old_tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
@@ -1639,7 +1647,8 @@ class Scheduler(threading.Thread):
request)
self.cancelJob(build_set, request_job)
- def _reconfigureTenant(self, context, min_ltimes, tenant,
+ def _reconfigureTenant(self, context, min_ltimes,
+ last_reconfigure_event_ltime, tenant,
old_tenant=None):
# This is called from _doReconfigureEvent while holding the
# layout lock
@@ -1666,10 +1675,29 @@ class Scheduler(threading.Thread):
for s in self.connections.getSources()
}
+ # Make sure last_reconfigure_event_ltime never goes backward
+ old_layout_state = self.tenant_layout_state.get(tenant.name)
+ if old_layout_state:
+ if (old_layout_state.last_reconfigure_event_ltime >
+ last_reconfigure_event_ltime):
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to previous ltime %s which is newer than %s",
+ old_layout_state.last_reconfigure_event_ltime,
+ last_reconfigure_event_ltime)
+ last_reconfigure_event_ltime =\
+ old_layout_state.last_reconfigure_event_ltime
+ if last_reconfigure_event_ltime < 0:
+ last_reconfigure_event_ltime = self.zk_client.getCurrentLtime()
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to current ltime %s", last_reconfigure_event_ltime)
+ else:
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to %s", last_reconfigure_event_ltime)
layout_state = LayoutState(
tenant_name=tenant.name,
hostname=self.hostname,
last_reconfigured=int(time.time()),
+ last_reconfigure_event_ltime=last_reconfigure_event_ltime,
uuid=tenant.layout.uuid,
branch_cache_min_ltimes=branch_cache_min_ltimes,
)
@@ -2178,6 +2206,8 @@ class Scheduler(threading.Thread):
"Unable to refresh pipeline change list for %s",
pipeline.name)
+ # Get the ltime of the last reconfiguration event
+ self.trigger_events[tenant.name].refreshMetadata()
for event in self.trigger_events[tenant.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Forwarding trigger event %s", event)
@@ -2266,7 +2296,15 @@ class Scheduler(threading.Thread):
# out cached data for this project and perform a
# reconfiguration.
self.reconfigureTenant(tenant, change.project, event)
-
+ # This will become the new required minimum event ltime
+ # for every trigger event processed after the
+ # reconfiguration, so make sure we update it after having
+ # submitted the reconfiguration event.
+ self.trigger_events[tenant.name].last_reconfigure_event_ltime =\
+ event.zuul_event_ltime
+
+ event.min_reconfigure_ltime = self.trigger_events[
+ tenant.name].last_reconfigure_event_ltime
for pipeline in tenant.layout.pipelines.values():
if (
pipeline.manager.eventMatches(event, change)
@@ -2281,6 +2319,21 @@ class Scheduler(threading.Thread):
if self._stopped:
return
log = get_annotated_logger(self.log, event.zuul_event_id)
+ if not isinstance(event, SupercedeEvent):
+ local_state = self.local_layout_state[tenant.name]
+ last_ltime = local_state.last_reconfigure_event_ltime
+ # The event tells us the ltime of the most recent
+ # reconfiguration event up to that point. If our local
+ # layout state wasn't generated by an event after that
+ # time, then we are too out of date to process this event.
+ # Abort now and wait for an update.
+ if (event.min_reconfigure_ltime > -1 and
+ event.min_reconfigure_ltime > last_ltime):
+ log.debug("Trigger event minimum reconfigure ltime of %s "
+ "newer than current reconfigure ltime of %s, "
+ "aborting early",
+ event.min_reconfigure_ltime, last_ltime)
+ return
log.debug("Processing trigger event %s", event)
try:
if isinstance(event, SupercedeEvent):
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index 8718f609c..52ffd582e 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -776,7 +776,7 @@ class TriggerEventQueue(ZooKeeperEventQueue):
self._put(data)
def __iter__(self):
- for data, ack_ref, _ in self._iterEvents():
+ for data, ack_ref, zstat in self._iterEvents():
try:
if (data["driver_name"] is None and
data["event_type"] == "SupercedeEvent"):
@@ -793,6 +793,9 @@ class TriggerEventQueue(ZooKeeperEventQueue):
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
event.driver_name = data["driver_name"]
+ # Initialize the logical timestamp if not valid
+ if event.zuul_event_ltime is None:
+ event.zuul_event_ltime = zstat.creation_transaction_id
yield event
@@ -803,6 +806,28 @@ class TenantTriggerEventQueue(TriggerEventQueue):
queue_root = TENANT_TRIGGER_ROOT.format(
tenant=tenant_name)
super().__init__(client, queue_root, connections)
+ self.metadata = {}
+
+ def _setQueueMetadata(self):
+ encoded_data = json.dumps(
+ self.metadata, sort_keys=True).encode("utf-8")
+ self.kazoo_client.set(self.queue_root, encoded_data)
+
+ def refreshMetadata(self):
+ data, zstat = self.kazoo_client.get(self.queue_root)
+ try:
+ self.metadata = json.loads(data)
+ except json.JSONDecodeError:
+ self.metadata = {}
+
+ @property
+ def last_reconfigure_event_ltime(self):
+ return self.metadata.get('last_reconfigure_event_ltime', -1)
+
+ @last_reconfigure_event_ltime.setter
+ def last_reconfigure_event_ltime(self, val):
+ self.metadata['last_reconfigure_event_ltime'] = val
+ self._setQueueMetadata()
@classmethod
def createRegistry(cls, client, connections):
diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py
index 386a93cc4..533226767 100644
--- a/zuul/zk/layout.py
+++ b/zuul/zk/layout.py
@@ -49,12 +49,15 @@ class LayoutState:
"""
def __init__(self, tenant_name, hostname, last_reconfigured, uuid,
- branch_cache_min_ltimes, ltime=-1):
+ branch_cache_min_ltimes, last_reconfigure_event_ltime,
+ ltime=-1):
self.uuid = uuid
self.ltime = ltime
self.tenant_name = tenant_name
self.hostname = hostname
self.last_reconfigured = last_reconfigured
+ self.last_reconfigure_event_ltime =\
+ last_reconfigure_event_ltime
self.branch_cache_min_ltimes = branch_cache_min_ltimes
def toDict(self):
@@ -62,6 +65,8 @@ class LayoutState:
"tenant_name": self.tenant_name,
"hostname": self.hostname,
"last_reconfigured": self.last_reconfigured,
+ "last_reconfigure_event_ltime":
+ self.last_reconfigure_event_ltime,
"uuid": self.uuid,
"branch_cache_min_ltimes": self.branch_cache_min_ltimes,
}
@@ -74,6 +79,7 @@ class LayoutState:
data["last_reconfigured"],
data.get("uuid"),
data.get("branch_cache_min_ltimes"),
+ data.get("last_reconfigure_event_ltime", -1),
data.get("ltime", -1),
)
diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py
index 109053f11..1be4dc2b5 100644
--- a/zuul/zk/nodepool.py
+++ b/zuul/zk/nodepool.py
@@ -12,6 +12,7 @@
import json
import logging
+import os
import time
from enum import Enum
from typing import Optional, List
@@ -43,7 +44,7 @@ class ZooKeeperNodepool(ZooKeeperBase):
Class implementing Nodepool related ZooKeeper interface.
"""
NODES_ROOT = "/nodepool/nodes"
- LAUNCHER_ROOT = "/nodepool/launchers"
+ COMPONENT_ROOT = "/nodepool/components"
REQUEST_ROOT = '/nodepool/requests'
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
HOLD_REQUEST_ROOT = '/zuul/hold-requests'
@@ -95,9 +96,6 @@ class ZooKeeperNodepool(ZooKeeperBase):
self._node_tree.close()
self._node_tree = None
- def _launcherPath(self, launcher):
- return "%s/%s" % (self.LAUNCHER_ROOT, launcher)
-
def _nodePath(self, node):
return "%s/%s" % (self.NODES_ROOT, node)
@@ -113,15 +111,15 @@ class ZooKeeperNodepool(ZooKeeperBase):
:returns: A list of Launcher objects, or empty list if none are found.
"""
+ root_path = os.path.join(self.COMPONENT_ROOT, 'pool')
try:
- launcher_ids = self.kazoo_client\
- .get_children(self.LAUNCHER_ROOT)
+ pools = self.kazoo_client.get_children(root_path)
except NoNodeError:
return []
objs = []
- for launcher in launcher_ids:
- path = self._launcherPath(launcher)
+ for pool in pools:
+ path = os.path.join(root_path, pool)
try:
data, _ = self.kazoo_client.get(path)
except NoNodeError:
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index aa32b8b9b..8de3f34ba 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -43,6 +43,7 @@ class ZKContext:
self.cumulative_write_znodes = 0
self.cumulative_read_bytes = 0
self.cumulative_write_bytes = 0
+ self.build_references = False
def sessionIsValid(self):
return ((not self.lock or self.lock.is_still_valid()) and