diff options
author | Zuul <zuul@review.opendev.org> | 2022-07-22 02:34:14 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-07-22 02:34:14 +0000 |
commit | 8a975bccb9d5baec03a4ca486c0da37e28e2bcb4 (patch) | |
tree | 6c13517c7138b13d8adf49075c0c3d5559d64dcf | |
parent | 559602910ff57f90e6478e748d17e0e298430c73 (diff) | |
parent | 9a279725f9b1a266bc3bb1e36f93a83e3405f33b (diff) | |
download | zuul-8a975bccb9d5baec03a4ca486c0da37e28e2bcb4.tar.gz |
Merge "Strictly sequence reconfiguration events"
-rw-r--r-- | tests/fixtures/layouts/trigger-sequence.yaml | 75 | ||||
-rw-r--r-- | tests/unit/test_gerrit.py | 1 | ||||
-rw-r--r-- | tests/unit/test_github_driver.py | 2 | ||||
-rw-r--r-- | tests/unit/test_gitlab_driver.py | 2 | ||||
-rw-r--r-- | tests/unit/test_pagure_driver.py | 2 | ||||
-rw-r--r-- | tests/unit/test_scheduler.py | 53 | ||||
-rw-r--r-- | tests/unit/test_zk.py | 10 | ||||
-rwxr-xr-x | zuul/cmd/client.py | 1 | ||||
-rw-r--r-- | zuul/model.py | 12 | ||||
-rw-r--r-- | zuul/scheduler.py | 63 | ||||
-rw-r--r-- | zuul/zk/event_queues.py | 27 | ||||
-rw-r--r-- | zuul/zk/layout.py | 8 |
12 files changed, 240 insertions, 16 deletions
diff --git a/tests/fixtures/layouts/trigger-sequence.yaml b/tests/fixtures/layouts/trigger-sequence.yaml new file mode 100644 index 000000000..31db734b1 --- /dev/null +++ b/tests/fixtures/layouts/trigger-sequence.yaml @@ -0,0 +1,75 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- pipeline: + name: gate + manager: dependent + success-message: Build succeeded (gate). + trigger: + gerrit: + - event: comment-added + approval: + - Approved: 1 + success: + gerrit: + Verified: 2 + submit: true + failure: + gerrit: + Verified: -2 + start: + gerrit: + Verified: 0 + precedence: high + +- pipeline: + name: post + manager: independent + trigger: + gerrit: + - event: ref-updated + ref: ^(?!refs/).*$ + +- pipeline: + name: tag + manager: independent + trigger: + gerrit: + - event: ref-updated + ref: ^refs/tags/.*$ + +- job: + name: base + parent: null + run: playbooks/base.yaml + nodeset: + nodes: + - label: ubuntu-xenial + name: controller + +- job: + name: check-job + run: playbooks/check.yaml + +- job: + name: post-job + run: playbooks/post.yaml + +- project: + name: org/project + check: + jobs: + - check-job + gate: + jobs: + - check-job diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index f0f9027bd..aa2bb1758 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -699,6 +699,7 @@ class TestPolling(ZuulTestCase): files=file_dict) A.setMerged() self.waitForPoll('gerrit') + self.waitUntilSettled() B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') B.setCheck('zuul:check', reset=True) diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index 1bfba36bb..f9f3371fa 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -38,7 +38,7 @@ from tests.base import (AnsibleZuulTestCase, BaseTestCase, simple_layout, random_sha1) from tests.base import ZuulWebFixture -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestGithubDriver(ZuulTestCase): diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py index 2715cdef1..6c4d4eeb9 100644 --- a/tests/unit/test_gitlab_driver.py +++ b/tests/unit/test_gitlab_driver.py @@ -28,7 +28,7 @@ from tests.base import ZuulTestCase, ZuulWebFixture from testtools.matchers import MatchesRegex -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestGitlabWebhook(ZuulTestCase): diff --git a/tests/unit/test_pagure_driver.py b/tests/unit/test_pagure_driver.py index 32635a389..92159cc9f 100644 --- a/tests/unit/test_pagure_driver.py +++ b/tests/unit/test_pagure_driver.py @@ -26,7 +26,7 @@ from zuul.zk.layout import LayoutState from tests.base import ZuulTestCase, simple_layout from tests.base import ZuulWebFixture -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestPagureDriver(ZuulTestCase): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index c6865a8d7..eb5ee826f 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -50,8 +50,9 @@ from tests.base import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.layout import LayoutState +from zuul.zk.locks import management_queue_lock -EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}) +EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1) class TestSchedulerSSL(SSLZuulTestCase): @@ -4214,6 +4215,56 @@ class TestScheduler(ZuulTestCase): dict(name='check-job', result='SUCCESS', changes='1,1'), ]) + @simple_layout('layouts/trigger-sequence.yaml') + def test_live_reconfiguration_trigger_sequence(self): + # Test that events arriving after an event that triggers a + # reconfiguration are handled after the reconfiguration + # completes. + + in_repo_conf = "[{project: {tag: {jobs: [post-job]}}}]" + file_dict = {'zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + sched = self.scheds.first.sched + # Hold the management queue so that we don't process any + # reconfiguration events yet. + with management_queue_lock( + self.zk_client, 'tenant-one', blocking=False + ): + with sched.run_handler_lock: + A.setMerged() + # Submit two events while no processing is happening: + # A change merged event that will trigger a reconfiguration + self.fake_gerrit.addEvent(A.getChangeMergedEvent()) + + # And a tag event which should only run a job after + # the config change above is in effect. + event = self.fake_gerrit.addFakeTag( + 'org/project', 'master', 'foo') + self.fake_gerrit.addEvent(event) + + # Wait for the tenant trigger queue to empty out, and for + # us to have a tenant management as well as a pipeline + # trigger event. At this point, we should be deferring + # the trigger event until the management event is handled. + for _ in iterate_timeout(60, 'queues'): + with sched.run_handler_lock: + if sched.trigger_events['tenant-one'].hasEvents(): + continue + if not sched.pipeline_trigger_events[ + 'tenant-one']['tag'].hasEvents(): + continue + if not sched.management_events['tenant-one'].hasEvents(): + continue + break + + # Now we can resume and process the reconfiguration event + sched.wake_event.set() + self.waitUntilSettled() + self.assertHistory([ + dict(name='post-job', result='SUCCESS'), + ]) + @simple_layout('layouts/repo-deleted.yaml') def test_repo_deleted(self): self.init_repo("org/delete-project") diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index eadf5c855..b1f393e47 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -1290,7 +1290,7 @@ class TestLayoutStore(ZooKeeperBaseTestCase): "github": 456, } state = LayoutState("tenant", "hostname", 0, layout_uuid, - branch_cache_min_ltimes) + branch_cache_min_ltimes, -1) store["tenant"] = state self.assertEqual(state, store["tenant"]) self.assertNotEqual(state.ltime, -1) @@ -1301,9 +1301,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase): def test_ordering(self): layout_uuid = uuid.uuid4().hex state_one = LayoutState("tenant", "hostname", 1, layout_uuid, - {}, ltime=1) + {}, -1, ltime=1) state_two = LayoutState("tenant", "hostname", 2, layout_uuid, - {}, ltime=2) + {}, -1, ltime=2) self.assertGreater(state_two, state_one) @@ -1312,9 +1312,9 @@ class TestLayoutStore(ZooKeeperBaseTestCase): min_ltimes = defaultdict(lambda x: -1) min_ltimes['foo'] = 1 state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex, - {}, ltime=1) + {}, -1, ltime=1) state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex, - {}, ltime=2) + {}, -1, ltime=2) store.setMinLtimes(state_one, min_ltimes) store.setMinLtimes(state_two, min_ltimes) store['tenant'] = state_one diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 490e47c59..6d6f16968 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -1044,6 +1044,7 @@ class Client(zuul.cmd.ZuulApp): tenant_name=args.tenant, hostname='admin command', last_reconfigured=int(time.time()), + last_reconfigure_event_ltime=-1, uuid=uuid4().hex, branch_cache_min_ltimes={}, ltime=ps._zstat.last_modified_transaction_id, diff --git a/zuul/model.py b/zuul/model.py index aa814ce6c..a07d5a640 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -5910,6 +5910,7 @@ class TenantReconfigureEvent(ManagementEvent): self.tenant_name = tenant_name self.project_branches = set([(project_name, branch_name)]) self.branch_cache_ltimes = {} + self.trigger_event_ltime = -1 self.merged_events = [] def __ne__(self, other): @@ -5931,6 +5932,8 @@ class TenantReconfigureEvent(ManagementEvent): self.branch_cache_ltimes.get(connection_name, ltime), ltime) self.zuul_event_ltime = max(self.zuul_event_ltime, other.zuul_event_ltime) + self.trigger_event_ltime = max(self.trigger_event_ltime, + other.trigger_event_ltime) self.merged_events.append(other) def toDict(self): @@ -5938,6 +5941,7 @@ class TenantReconfigureEvent(ManagementEvent): d["tenant_name"] = self.tenant_name d["project_branches"] = list(self.project_branches) d["branch_cache_ltimes"] = self.branch_cache_ltimes + d["trigger_event_ltime"] = self.trigger_event_ltime return d @classmethod @@ -5953,6 +5957,7 @@ class TenantReconfigureEvent(ManagementEvent): tuple(pb) for pb in data["project_branches"] ) event.branch_cache_ltimes = data.get("branch_cache_ltimes", {}) + event.trigger_event_ltime = data.get("trigger_event_ltime", -1) return event @@ -6289,6 +6294,9 @@ class TriggerEvent(AbstractEvent): self.branch_deleted = False self.branch_protected = True self.ref = None + # For reconfiguration sequencing + self.min_reconfigure_ltime = -1 + self.zuul_event_ltime = None # For management events (eg: enqueue / promote) self.tenant_name = None self.project_hostname = None @@ -6326,6 +6334,8 @@ class TriggerEvent(AbstractEvent): "branch_deleted": self.branch_deleted, "branch_protected": self.branch_protected, "ref": self.ref, + "min_reconfigure_ltime": self.min_reconfigure_ltime, + "zuul_event_ltime": self.zuul_event_ltime, "tenant_name": self.tenant_name, "project_hostname": self.project_hostname, "project_name": self.project_name, @@ -6358,6 +6368,8 @@ class TriggerEvent(AbstractEvent): self.branch_deleted = d["branch_deleted"] self.branch_protected = d["branch_protected"] self.ref = d["ref"] + self.min_reconfigure_ltime = d.get("min_reconfigure_ltime", -1) + self.zuul_event_ltime = d.get("zuul_event_ltime", None) self.tenant_name = d["tenant_name"] self.project_hostname = d["project_hostname"] self.project_name = d["project_name"] diff --git a/zuul/scheduler.py b/zuul/scheduler.py index b436c356f..272235757 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -860,9 +860,14 @@ class Scheduler(threading.Thread): self.log.exception("Exception reporting runtime stats") def reconfigureTenant(self, tenant, project, trigger_event): + if trigger_event: + trigger_event_ltime = trigger_event.zuul_event_ltime + else: + trigger_event_ltime = None self.log.debug("Submitting tenant reconfiguration event for " - "%s due to event %s in project %s", - tenant.name, trigger_event, project) + "%s due to event %s in project %s, ltime %s", + tenant.name, trigger_event, project, + trigger_event_ltime) branch = trigger_event and trigger_event.branch event = TenantReconfigureEvent( tenant.name, project.canonical_name, branch, @@ -870,6 +875,7 @@ class Scheduler(threading.Thread): if trigger_event: event.branch_cache_ltimes[trigger_event.connection_name] = ( trigger_event.branch_cache_ltime) + event.trigger_event_ltime = trigger_event_ltime self.management_events[tenant.name].put(event, needs_result=False) def fullReconfigureCommandHandler(self): @@ -970,7 +976,7 @@ class Scheduler(threading.Thread): if layout_state is None: # Reconfigure only tenants w/o an existing layout state ctx = self.createZKContext(tlock, self.log) - self._reconfigureTenant(ctx, min_ltimes, tenant) + self._reconfigureTenant(ctx, min_ltimes, -1, tenant) self._reportInitialStats(tenant) else: self.local_layout_state[tenant_name] = layout_state @@ -1422,6 +1428,7 @@ class Scheduler(threading.Thread): ctx = self.createZKContext(lock, self.log) if tenant is not None: self._reconfigureTenant(ctx, min_ltimes, + -1, tenant, old_tenant) else: self._reconfigureDeleteTenant(ctx, old_tenant) @@ -1485,6 +1492,7 @@ class Scheduler(threading.Thread): tenant = self.abide.tenants[event.tenant_name] ctx = self.createZKContext(lock, self.log) self._reconfigureTenant(ctx, min_ltimes, + event.trigger_event_ltime, tenant, old_tenant) duration = round(time.monotonic() - start, 3) self.log.info("Tenant reconfiguration complete for %s (duration: %s " @@ -1639,7 +1647,8 @@ class Scheduler(threading.Thread): request) self.cancelJob(build_set, request_job) - def _reconfigureTenant(self, context, min_ltimes, tenant, + def _reconfigureTenant(self, context, min_ltimes, + last_reconfigure_event_ltime, tenant, old_tenant=None): # This is called from _doReconfigureEvent while holding the # layout lock @@ -1666,10 +1675,29 @@ class Scheduler(threading.Thread): for s in self.connections.getSources() } + # Make sure last_reconfigure_event_ltime never goes backward + old_layout_state = self.tenant_layout_state.get(tenant.name) + if old_layout_state: + if (old_layout_state.last_reconfigure_event_ltime > + last_reconfigure_event_ltime): + self.log.debug("Setting layout state last reconfigure ltime " + "to previous ltime %s which is newer than %s", + old_layout_state.last_reconfigure_event_ltime, + last_reconfigure_event_ltime) + last_reconfigure_event_ltime =\ + old_layout_state.last_reconfigure_event_ltime + if last_reconfigure_event_ltime < 0: + last_reconfigure_event_ltime = self.zk_client.getCurrentLtime() + self.log.debug("Setting layout state last reconfigure ltime " + "to current ltime %s", last_reconfigure_event_ltime) + else: + self.log.debug("Setting layout state last reconfigure ltime " + "to %s", last_reconfigure_event_ltime) layout_state = LayoutState( tenant_name=tenant.name, hostname=self.hostname, last_reconfigured=int(time.time()), + last_reconfigure_event_ltime=last_reconfigure_event_ltime, uuid=tenant.layout.uuid, branch_cache_min_ltimes=branch_cache_min_ltimes, ) @@ -2178,6 +2206,8 @@ class Scheduler(threading.Thread): "Unable to refresh pipeline change list for %s", pipeline.name) + # Get the ltime of the last reconfiguration event + self.trigger_events[tenant.name].refreshMetadata() for event in self.trigger_events[tenant.name]: log = get_annotated_logger(self.log, event.zuul_event_id) log.debug("Forwarding trigger event %s", event) @@ -2266,7 +2296,15 @@ class Scheduler(threading.Thread): # out cached data for this project and perform a # reconfiguration. self.reconfigureTenant(tenant, change.project, event) - + # This will become the new required minimum event ltime + # for every trigger event processed after the + # reconfiguration, so make sure we update it after having + # submitted the reconfiguration event. + self.trigger_events[tenant.name].last_reconfigure_event_ltime =\ + event.zuul_event_ltime + + event.min_reconfigure_ltime = self.trigger_events[ + tenant.name].last_reconfigure_event_ltime for pipeline in tenant.layout.pipelines.values(): if ( pipeline.manager.eventMatches(event, change) @@ -2281,6 +2319,21 @@ class Scheduler(threading.Thread): if self._stopped: return log = get_annotated_logger(self.log, event.zuul_event_id) + if not isinstance(event, SupercedeEvent): + local_state = self.local_layout_state[tenant.name] + last_ltime = local_state.last_reconfigure_event_ltime + # The event tells us the ltime of the most recent + # reconfiguration event up to that point. If our local + # layout state wasn't generated by an event after that + # time, then we are too out of date to process this event. + # Abort now and wait for an update. + if (event.min_reconfigure_ltime > -1 and + event.min_reconfigure_ltime > last_ltime): + log.debug("Trigger event minimum reconfigure ltime of %s " + "newer than current reconfigure ltime of %s, " + "aborting early", + event.min_reconfigure_ltime, last_ltime) + return log.debug("Processing trigger event %s", event) try: if isinstance(event, SupercedeEvent): diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index 8718f609c..52ffd582e 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -776,7 +776,7 @@ class TriggerEventQueue(ZooKeeperEventQueue): self._put(data) def __iter__(self): - for data, ack_ref, _ in self._iterEvents(): + for data, ack_ref, zstat in self._iterEvents(): try: if (data["driver_name"] is None and data["event_type"] == "SupercedeEvent"): @@ -793,6 +793,9 @@ class TriggerEventQueue(ZooKeeperEventQueue): event = event_class.fromDict(event_data) event.ack_ref = ack_ref event.driver_name = data["driver_name"] + # Initialize the logical timestamp if not valid + if event.zuul_event_ltime is None: + event.zuul_event_ltime = zstat.creation_transaction_id yield event @@ -803,6 +806,28 @@ class TenantTriggerEventQueue(TriggerEventQueue): queue_root = TENANT_TRIGGER_ROOT.format( tenant=tenant_name) super().__init__(client, queue_root, connections) + self.metadata = {} + + def _setQueueMetadata(self): + encoded_data = json.dumps( + self.metadata, sort_keys=True).encode("utf-8") + self.kazoo_client.set(self.queue_root, encoded_data) + + def refreshMetadata(self): + data, zstat = self.kazoo_client.get(self.queue_root) + try: + self.metadata = json.loads(data) + except json.JSONDecodeError: + self.metadata = {} + + @property + def last_reconfigure_event_ltime(self): + return self.metadata.get('last_reconfigure_event_ltime', -1) + + @last_reconfigure_event_ltime.setter + def last_reconfigure_event_ltime(self, val): + self.metadata['last_reconfigure_event_ltime'] = val + self._setQueueMetadata() @classmethod def createRegistry(cls, client, connections): diff --git a/zuul/zk/layout.py b/zuul/zk/layout.py index 386a93cc4..533226767 100644 --- a/zuul/zk/layout.py +++ b/zuul/zk/layout.py @@ -49,12 +49,15 @@ class LayoutState: """ def __init__(self, tenant_name, hostname, last_reconfigured, uuid, - branch_cache_min_ltimes, ltime=-1): + branch_cache_min_ltimes, last_reconfigure_event_ltime, + ltime=-1): self.uuid = uuid self.ltime = ltime self.tenant_name = tenant_name self.hostname = hostname self.last_reconfigured = last_reconfigured + self.last_reconfigure_event_ltime =\ + last_reconfigure_event_ltime self.branch_cache_min_ltimes = branch_cache_min_ltimes def toDict(self): @@ -62,6 +65,8 @@ class LayoutState: "tenant_name": self.tenant_name, "hostname": self.hostname, "last_reconfigured": self.last_reconfigured, + "last_reconfigure_event_ltime": + self.last_reconfigure_event_ltime, "uuid": self.uuid, "branch_cache_min_ltimes": self.branch_cache_min_ltimes, } @@ -74,6 +79,7 @@ class LayoutState: data["last_reconfigured"], data.get("uuid"), data.get("branch_cache_min_ltimes"), + data.get("last_reconfigure_event_ltime", -1), data.get("ltime", -1), ) |