summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-07-22 02:34:14 +0000
committerGerrit Code Review <review@openstack.org>2022-07-22 02:34:14 +0000
commit8a975bccb9d5baec03a4ca486c0da37e28e2bcb4 (patch)
tree6c13517c7138b13d8adf49075c0c3d5559d64dcf
parent559602910ff57f90e6478e748d17e0e298430c73 (diff)
parent9a279725f9b1a266bc3bb1e36f93a83e3405f33b (diff)
downloadzuul-8a975bccb9d5baec03a4ca486c0da37e28e2bcb4.tar.gz
Merge "Strictly sequence reconfiguration events"
-rw-r--r--tests/fixtures/layouts/trigger-sequence.yaml75
-rw-r--r--tests/unit/test_gerrit.py1
-rw-r--r--tests/unit/test_github_driver.py2
-rw-r--r--tests/unit/test_gitlab_driver.py2
-rw-r--r--tests/unit/test_pagure_driver.py2
-rw-r--r--tests/unit/test_scheduler.py53
-rw-r--r--tests/unit/test_zk.py10
-rwxr-xr-xzuul/cmd/client.py1
-rw-r--r--zuul/model.py12
-rw-r--r--zuul/scheduler.py63
-rw-r--r--zuul/zk/event_queues.py27
-rw-r--r--zuul/zk/layout.py8
12 files changed, 240 insertions, 16 deletions
diff --git a/tests/fixtures/layouts/trigger-sequence.yaml b/tests/fixtures/layouts/trigger-sequence.yaml
new file mode 100644
index 000000000..31db734b1
--- /dev/null
+++ b/tests/fixtures/layouts/trigger-sequence.yaml
@@ -0,0 +1,75 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- pipeline:
+ name: gate
+ manager: dependent
+ success-message: Build succeeded (gate).
+ trigger:
+ gerrit:
+ - event: comment-added
+ approval:
+ - Approved: 1
+ success:
+ gerrit:
+ Verified: 2
+ submit: true
+ failure:
+ gerrit:
+ Verified: -2
+ start:
+ gerrit:
+ Verified: 0
+ precedence: high
+
+- pipeline:
+ name: post
+ manager: independent
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^(?!refs/).*$
+
+- pipeline:
+ name: tag
+ manager: independent
+ trigger:
+ gerrit:
+ - event: ref-updated
+ ref: ^refs/tags/.*$
+
+- job:
+ name: base
+ parent: null
+ run: playbooks/base.yaml
+ nodeset:
+ nodes:
+ - label: ubuntu-xenial
+ name: controller
+
+- job:
+ name: check-job
+ run: playbooks/check.yaml
+
+- job:
+ name: post-job
+ run: playbooks/post.yaml
+
+- project:
+ name: org/project
+ check:
+ jobs:
+ - check-job
+ gate:
+ jobs:
+ - check-job
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index f0f9027bd..aa2bb1758 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -699,6 +699,7 @@ class TestPolling(ZuulTestCase):
files=file_dict)
A.setMerged()
self.waitForPoll('gerrit')
+ self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.setCheck('zuul:check', reset=True)
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index 1bfba36bb..f9f3371fa 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -38,7 +38,7 @@ from tests.base import (AnsibleZuulTestCase, BaseTestCase,
simple_layout, random_sha1)
from tests.base import ZuulWebFixture
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestGithubDriver(ZuulTestCase):
diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py
index 2715cdef1..6c4d4eeb9 100644
--- a/tests/unit/test_gitlab_driver.py
+++ b/tests/unit/test_gitlab_driver.py
@@ -28,7 +28,7 @@ from tests.base import ZuulTestCase, ZuulWebFixture
from testtools.matchers import MatchesRegex
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestGitlabWebhook(ZuulTestCase):
diff --git a/tests/unit/test_pagure_driver.py b/tests/unit/test_pagure_driver.py
index 32635a389..92159cc9f 100644
--- a/tests/unit/test_pagure_driver.py
+++ b/tests/unit/test_pagure_driver.py
@@ -26,7 +26,7 @@ from zuul.zk.layout import LayoutState
from tests.base import ZuulTestCase, simple_layout
from tests.base import ZuulWebFixture
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestPagureDriver(ZuulTestCase):
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c6865a8d7..eb5ee826f 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -50,8 +50,9 @@ from tests.base import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.layout import LayoutState
+from zuul.zk.locks import management_queue_lock
-EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {})
+EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
class TestSchedulerSSL(SSLZuulTestCase):
@@ -4214,6 +4215,56 @@ class TestScheduler(ZuulTestCase):
dict(name='check-job', result='SUCCESS', changes='1,1'),
])
+ @simple_layout('layouts/trigger-sequence.yaml')
+ def test_live_reconfiguration_trigger_sequence(self):
+ # Test that events arriving after an event that triggers a
+ # reconfiguration are handled after the reconfiguration
+ # completes.
+
+ in_repo_conf = "[{project: {tag: {jobs: [post-job]}}}]"
+ file_dict = {'zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ sched = self.scheds.first.sched
+ # Hold the management queue so that we don't process any
+ # reconfiguration events yet.
+ with management_queue_lock(
+ self.zk_client, 'tenant-one', blocking=False
+ ):
+ with sched.run_handler_lock:
+ A.setMerged()
+ # Submit two events while no processing is happening:
+ # A change merged event that will trigger a reconfiguration
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+
+ # And a tag event which should only run a job after
+ # the config change above is in effect.
+ event = self.fake_gerrit.addFakeTag(
+ 'org/project', 'master', 'foo')
+ self.fake_gerrit.addEvent(event)
+
+ # Wait for the tenant trigger queue to empty out, and for
+ # us to have a tenant management as well as a pipeline
+ # trigger event. At this point, we should be deferring
+ # the trigger event until the management event is handled.
+ for _ in iterate_timeout(60, 'queues'):
+ with sched.run_handler_lock:
+ if sched.trigger_events['tenant-one'].hasEvents():
+ continue
+ if not sched.pipeline_trigger_events[
+ 'tenant-one']['tag'].hasEvents():
+ continue
+ if not sched.management_events['tenant-one'].hasEvents():
+ continue
+ break
+
+ # Now we can resume and process the reconfiguration event
+ sched.wake_event.set()
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='post-job', result='SUCCESS'),
+ ])
+
@simple_layout('layouts/repo-deleted.yaml')
def test_repo_deleted(self):
self.init_repo("org/delete-project")
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index eadf5c855..b1f393e47 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -1290,7 +1290,7 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
"github": 456,
}
state = LayoutState("tenant", "hostname", 0, layout_uuid,
- branch_cache_min_ltimes)
+ branch_cache_min_ltimes, -1)
store["tenant"] = state
self.assertEqual(state, store["tenant"])
self.assertNotEqual(state.ltime, -1)
@@ -1301,9 +1301,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
def test_ordering(self):
layout_uuid = uuid.uuid4().hex
state_one = LayoutState("tenant", "hostname", 1, layout_uuid,
- {}, ltime=1)
+ {}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, layout_uuid,
- {}, ltime=2)
+ {}, -1, ltime=2)
self.assertGreater(state_two, state_one)
@@ -1312,9 +1312,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
min_ltimes = defaultdict(lambda x: -1)
min_ltimes['foo'] = 1
state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex,
- {}, ltime=1)
+ {}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex,
- {}, ltime=2)
+ {}, -1, ltime=2)
store.setMinLtimes(state_one, min_ltimes)
store.setMinLtimes(state_two, min_ltimes)
store['tenant'] = state_one
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 490e47c59..6d6f16968 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -1044,6 +1044,7 @@ class Client(zuul.cmd.ZuulApp):
tenant_name=args.tenant,
hostname='admin command',
last_reconfigured=int(time.time()),
+ last_reconfigure_event_ltime=-1,
uuid=uuid4().hex,
branch_cache_min_ltimes={},
ltime=ps._zstat.last_modified_transaction_id,
diff --git a/zuul/model.py b/zuul/model.py
index aa814ce6c..a07d5a640 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -5910,6 +5910,7 @@ class TenantReconfigureEvent(ManagementEvent):
self.tenant_name = tenant_name
self.project_branches = set([(project_name, branch_name)])
self.branch_cache_ltimes = {}
+ self.trigger_event_ltime = -1
self.merged_events = []
def __ne__(self, other):
@@ -5931,6 +5932,8 @@ class TenantReconfigureEvent(ManagementEvent):
self.branch_cache_ltimes.get(connection_name, ltime), ltime)
self.zuul_event_ltime = max(self.zuul_event_ltime,
other.zuul_event_ltime)
+ self.trigger_event_ltime = max(self.trigger_event_ltime,
+ other.trigger_event_ltime)
self.merged_events.append(other)
def toDict(self):
@@ -5938,6 +5941,7 @@ class TenantReconfigureEvent(ManagementEvent):
d["tenant_name"] = self.tenant_name
d["project_branches"] = list(self.project_branches)
d["branch_cache_ltimes"] = self.branch_cache_ltimes
+ d["trigger_event_ltime"] = self.trigger_event_ltime
return d
@classmethod
@@ -5953,6 +5957,7 @@ class TenantReconfigureEvent(ManagementEvent):
tuple(pb) for pb in data["project_branches"]
)
event.branch_cache_ltimes = data.get("branch_cache_ltimes", {})
+ event.trigger_event_ltime = data.get("trigger_event_ltime", -1)
return event
@@ -6289,6 +6294,9 @@ class TriggerEvent(AbstractEvent):
self.branch_deleted = False
self.branch_protected = True
self.ref = None
+ # For reconfiguration sequencing
+ self.min_reconfigure_ltime = -1
+ self.zuul_event_ltime = None
# For management events (eg: enqueue / promote)
self.tenant_name = None
self.project_hostname = None
@@ -6326,6 +6334,8 @@ class TriggerEvent(AbstractEvent):
"branch_deleted": self.branch_deleted,
"branch_protected": self.branch_protected,
"ref": self.ref,
+ "min_reconfigure_ltime": self.min_reconfigure_ltime,
+ "zuul_event_ltime": self.zuul_event_ltime,
"tenant_name": self.tenant_name,
"project_hostname": self.project_hostname,
"project_name": self.project_name,
@@ -6358,6 +6368,8 @@ class TriggerEvent(AbstractEvent):
self.branch_deleted = d["branch_deleted"]
self.branch_protected = d["branch_protected"]
self.ref = d["ref"]
+ self.min_reconfigure_ltime = d.get("min_reconfigure_ltime", -1)
+ self.zuul_event_ltime = d.get("zuul_event_ltime", None)
self.tenant_name = d["tenant_name"]
self.project_hostname = d["project_hostname"]
self.project_name = d["project_name"]
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index b436c356f..272235757 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -860,9 +860,14 @@ class Scheduler(threading.Thread):
self.log.exception("Exception reporting runtime stats")
def reconfigureTenant(self, tenant, project, trigger_event):
+ if trigger_event:
+ trigger_event_ltime = trigger_event.zuul_event_ltime
+ else:
+ trigger_event_ltime = None
self.log.debug("Submitting tenant reconfiguration event for "
- "%s due to event %s in project %s",
- tenant.name, trigger_event, project)
+ "%s due to event %s in project %s, ltime %s",
+ tenant.name, trigger_event, project,
+ trigger_event_ltime)
branch = trigger_event and trigger_event.branch
event = TenantReconfigureEvent(
tenant.name, project.canonical_name, branch,
@@ -870,6 +875,7 @@ class Scheduler(threading.Thread):
if trigger_event:
event.branch_cache_ltimes[trigger_event.connection_name] = (
trigger_event.branch_cache_ltime)
+ event.trigger_event_ltime = trigger_event_ltime
self.management_events[tenant.name].put(event, needs_result=False)
def fullReconfigureCommandHandler(self):
@@ -970,7 +976,7 @@ class Scheduler(threading.Thread):
if layout_state is None:
# Reconfigure only tenants w/o an existing layout state
ctx = self.createZKContext(tlock, self.log)
- self._reconfigureTenant(ctx, min_ltimes, tenant)
+ self._reconfigureTenant(ctx, min_ltimes, -1, tenant)
self._reportInitialStats(tenant)
else:
self.local_layout_state[tenant_name] = layout_state
@@ -1422,6 +1428,7 @@ class Scheduler(threading.Thread):
ctx = self.createZKContext(lock, self.log)
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
+ -1,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
@@ -1485,6 +1492,7 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants[event.tenant_name]
ctx = self.createZKContext(lock, self.log)
self._reconfigureTenant(ctx, min_ltimes,
+ event.trigger_event_ltime,
tenant, old_tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
@@ -1639,7 +1647,8 @@ class Scheduler(threading.Thread):
request)
self.cancelJob(build_set, request_job)
- def _reconfigureTenant(self, context, min_ltimes, tenant,
+ def _reconfigureTenant(self, context, min_ltimes,
+ last_reconfigure_event_ltime, tenant,
old_tenant=None):
# This is called from _doReconfigureEvent while holding the
# layout lock
@@ -1666,10 +1675,29 @@ class Scheduler(threading.Thread):
for s in self.connections.getSources()
}
+ # Make sure last_reconfigure_event_ltime never goes backward
+ old_layout_state = self.tenant_layout_state.get(tenant.name)
+ if old_layout_state:
+ if (old_layout_state.last_reconfigure_event_ltime >
+ last_reconfigure_event_ltime):
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to previous ltime %s which is newer than %s",
+ old_layout_state.last_reconfigure_event_ltime,
+ last_reconfigure_event_ltime)
+ last_reconfigure_event_ltime =\
+ old_layout_state.last_reconfigure_event_ltime
+ if last_reconfigure_event_ltime < 0:
+ last_reconfigure_event_ltime = self.zk_client.getCurrentLtime()
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to current ltime %s", last_reconfigure_event_ltime)
+ else:
+ self.log.debug("Setting layout state last reconfigure ltime "
+ "to %s", last_reconfigure_event_ltime)
layout_state = LayoutState(
tenant_name=tenant.name,
hostname=self.hostname,
last_reconfigured=int(time.time()),
+ last_reconfigure_event_ltime=last_reconfigure_event_ltime,
uuid=tenant.layout.uuid,
branch_cache_min_ltimes=branch_cache_min_ltimes,
)
@@ -2178,6 +2206,8 @@ class Scheduler(threading.Thread):
"Unable to refresh pipeline change list for %s",
pipeline.name)
+ # Get the ltime of the last reconfiguration event
+ self.trigger_events[tenant.name].refreshMetadata()
for event in self.trigger_events[tenant.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Forwarding trigger event %s", event)
@@ -2266,7 +2296,15 @@ class Scheduler(threading.Thread):
# out cached data for this project and perform a
# reconfiguration.
self.reconfigureTenant(tenant, change.project, event)
-
+ # This will become the new required minimum event ltime
+ # for every trigger event processed after the
+ # reconfiguration, so make sure we update it after having
+ # submitted the reconfiguration event.
+ self.trigger_events[tenant.name].last_reconfigure_event_ltime =\
+ event.zuul_event_ltime
+
+ event.min_reconfigure_ltime = self.trigger_events[
+ tenant.name].last_reconfigure_event_ltime
for pipeline in tenant.layout.pipelines.values():
if (
pipeline.manager.eventMatches(event, change)
@@ -2281,6 +2319,21 @@ class Scheduler(threading.Thread):
if self._stopped:
return
log = get_annotated_logger(self.log, event.zuul_event_id)
+ if not isinstance(event, SupercedeEvent):
+ local_state = self.local_layout_state[tenant.name]
+ last_ltime = local_state.last_reconfigure_event_ltime
+ # The event tells us the ltime of the most recent
+ # reconfiguration event up to that point. If our local
+ # layout state wasn't generated by an event after that
+ # time, then we are too out of date to process this event.
+ # Abort now and wait for an update.
+ if (event.min_reconfigure_ltime > -1 and
+ event.min_reconfigure_ltime > last_ltime):
+ log.debug("Trigger event minimum reconfigure ltime of %s "
+ "newer than current reconfigure ltime of %s, "
+ "aborting early",
+ event.min_reconfigure_ltime, last_ltime)
+ return
log.debug("Processing trigger event %s", event)
try:
if isinstance(event, SupercedeEvent):
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index 8718f609c..52ffd582e 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -776,7 +776,7 @@ class TriggerEventQueue(ZooKeeperEventQueue):
self._put(data)
def __iter__(self):
- for data, ack_ref, _ in self._iterEvents():
+ for data, ack_ref, zstat in self._iterEvents():
try:
if (data["driver_name"] is None and
data["event_type"] == "SupercedeEvent"):
@@ -793,6 +793,9 @@ class TriggerEventQueue(ZooKeeperEventQueue):
event = event_class.fromDict(event_data)
event.ack_ref = ack_ref
event.driver_name = data["driver_name"]
+ # Initialize the logical timestamp if not valid
+ if event.zuul_event_ltime is None:
+ event.zuul_event_ltime = zstat.creation_transaction_id
yield event
@@ -803,6 +806,28 @@ class TenantTriggerEventQueue(TriggerEventQueue):
queue_root = TENANT_TRIGGER_ROOT.format(
tenant=tenant_name)
super().__init__(client, queue_root, connections)
+ self.metadata = {}
+
+ def _setQueueMetadata(self):
+ encoded_data = json.dumps(
+ self.metadata, sort_keys=True).encode("utf-8")
+ self.kazoo_client.set(self.queue_root, encoded_data)
+
+ def refreshMetadata(self):
+ data, zstat = self.kazoo_client.get(self.queue_root)
+ try:
+ self.metadata = json.loads(data)
+ except json.JSONDecodeError:
+ self.metadata = {}
+
+ @property
+ def last_reconfigure_event_ltime(self):
+ return self.metadata.get('last_reconfigure_event_ltime', -1)
+
+ @last_reconfigure_event_ltime.setter
+ def last_reconfigure_event_ltime(self, val):
+ self.metadata['last_reconfigure_event_ltime'] = val
+ self._setQueueMetadata()
@classmethod
def createRegistry(cls, client, connections):
diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py
index 386a93cc4..533226767 100644
--- a/zuul/zk/layout.py
+++ b/zuul/zk/layout.py
@@ -49,12 +49,15 @@ class LayoutState:
"""
def __init__(self, tenant_name, hostname, last_reconfigured, uuid,
- branch_cache_min_ltimes, ltime=-1):
+ branch_cache_min_ltimes, last_reconfigure_event_ltime,
+ ltime=-1):
self.uuid = uuid
self.ltime = ltime
self.tenant_name = tenant_name
self.hostname = hostname
self.last_reconfigured = last_reconfigured
+ self.last_reconfigure_event_ltime =\
+ last_reconfigure_event_ltime
self.branch_cache_min_ltimes = branch_cache_min_ltimes
def toDict(self):
@@ -62,6 +65,8 @@ class LayoutState:
"tenant_name": self.tenant_name,
"hostname": self.hostname,
"last_reconfigured": self.last_reconfigured,
+ "last_reconfigure_event_ltime":
+ self.last_reconfigure_event_ltime,
"uuid": self.uuid,
"branch_cache_min_ltimes": self.branch_cache_min_ltimes,
}
@@ -74,6 +79,7 @@ class LayoutState:
data["last_reconfigured"],
data.get("uuid"),
data.get("branch_cache_min_ltimes"),
+ data.get("last_reconfigure_event_ltime", -1),
data.get("ltime", -1),
)