diff options
-rw-r--r-- | tests/unit/test_event_queues.py | 40 | ||||
-rw-r--r-- | zuul/configloader.py | 10 | ||||
-rw-r--r-- | zuul/merger/server.py | 21 | ||||
-rw-r--r-- | zuul/model.py | 4 | ||||
-rw-r--r-- | zuul/nodepool.py | 2 | ||||
-rw-r--r-- | zuul/rpclistener.py | 2 | ||||
-rw-r--r-- | zuul/zk/__init__.py | 1 | ||||
-rw-r--r-- | zuul/zk/event_queues.py | 289 |
8 files changed, 261 insertions, 108 deletions
diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py index 61c7ba4bf..01a1d160a 100644 --- a/tests/unit/test_event_queues.py +++ b/tests/unit/test_event_queues.py @@ -19,7 +19,7 @@ import testtools from zuul import model from zuul.driver import Driver, TriggerInterface from zuul.lib.connections import ConnectionRegistry -from zuul.zk import ZooKeeperClient, event_queues +from zuul.zk import ZooKeeperClient, event_queues, sharding from tests.base import BaseTestCase, iterate_timeout @@ -128,6 +128,44 @@ class TestTriggerEventQueue(EventQueueBaseTestCase): self.driver = DummyDriver() self.connections.registerDriver(self.driver) + def test_sharded_tenant_trigger_events(self): + # Test enqueue/dequeue of the tenant trigger event queue. + queue = event_queues.TenantTriggerEventQueue( + self.zk_client, self.connections, "tenant" + ) + + self.assertEqual(len(queue), 0) + self.assertFalse(queue.hasEvents()) + + event = DummyTriggerEvent() + data = {'test': "x" * (sharding.NODE_BYTE_SIZE_LIMIT + 1)} + event.data = data + + queue.put(self.driver.driver_name, event) + queue.put(self.driver.driver_name, event) + + self.assertEqual(len(queue), 2) + self.assertTrue(queue.hasEvents()) + + processed = 0 + for event in queue: + self.assertIsInstance(event, DummyTriggerEvent) + processed += 1 + + self.assertEqual(processed, 2) + self.assertEqual(len(queue), 2) + self.assertTrue(queue.hasEvents()) + + acked = 0 + for event in queue: + queue.ack(event) + self.assertEqual(event.data, data) + acked += 1 + + self.assertEqual(acked, 2) + self.assertEqual(len(queue), 0) + self.assertFalse(queue.hasEvents()) + def test_tenant_trigger_events(self): # Test enqueue/dequeue of the tenant trigger event queue. queue = event_queues.TenantTriggerEventQueue( diff --git a/zuul/configloader.py b/zuul/configloader.py index b56d3ca70..cb589be17 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -2347,13 +2347,19 @@ class ConfigLoader(object): """ if tenant_name not in unparsed_abide.tenants: - del abide.tenants[tenant_name] + # Copy tenants dictionary to not break concurrent iterations. + tenants = abide.tenants.copy() + del tenants[tenant_name] + abide.tenants = tenants return None unparsed_config = unparsed_abide.tenants[tenant_name] new_tenant = self.tenant_parser.fromYaml( abide, unparsed_config, ansible_manager, min_ltimes) - abide.tenants[tenant_name] = new_tenant + # Copy tenants dictionary to not break concurrent iterations. + tenants = abide.tenants.copy() + tenants[tenant_name] = new_tenant + abide.tenants = tenants if len(new_tenant.layout.loading_errors): self.log.warning( "%s errors detected during %s tenant configuration loading", diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 6b4fd14ca..e5a81e457 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -16,6 +16,7 @@ import json import logging import os import socket +import sys import threading from abc import ABCMeta from configparser import ConfigParser @@ -188,7 +189,10 @@ class BaseMergeServer(metaclass=ABCMeta): else: result = dict(updated=True, files=files) - job.sendWorkComplete(json.dumps(result)) + payload = json.dumps(result) + self.log.debug("Completed cat job %s: payload size: %s", + job.unique, sys.getsizeof(payload)) + job.sendWorkComplete(payload) def merge(self, job): self.log.debug("Got merge job: %s" % job.unique) @@ -212,7 +216,10 @@ class BaseMergeServer(metaclass=ABCMeta): (result['commit'], result['files'], result['repo_state'], recent, orig_commit) = ret result['zuul_event_id'] = zuul_event_id - job.sendWorkComplete(json.dumps(result)) + payload = json.dumps(result) + self.log.debug("Completed merge job %s: payload size: %s", + job.unique, sys.getsizeof(payload)) + job.sendWorkComplete(payload) def refstate(self, job): self.log.debug("Got refstate job: %s" % job.unique) @@ -225,7 +232,10 @@ class BaseMergeServer(metaclass=ABCMeta): repo_state=repo_state, item_in_branches=item_in_branches) result['zuul_event_id'] = zuul_event_id - job.sendWorkComplete(json.dumps(result)) + payload = json.dumps(result) + self.log.debug("Completed refstate job %s: payload size: %s", + job.unique, sys.getsizeof(payload)) + job.sendWorkComplete(payload) def fileschanges(self, job): self.log.debug("Got fileschanges job: %s" % job.unique) @@ -250,7 +260,10 @@ class BaseMergeServer(metaclass=ABCMeta): result = dict(updated=True, files=files) result['zuul_event_id'] = zuul_event_id - job.sendWorkComplete(json.dumps(result)) + payload = json.dumps(result) + self.log.debug("Completed fileschanges job %s: payload size: %s", + job.unique, sys.getsizeof(payload)) + job.sendWorkComplete(payload) class MergeServer(BaseMergeServer): diff --git a/zuul/model.py b/zuul/model.py index 5d071cde2..13c69c8b6 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -5436,8 +5436,8 @@ class UnparsedBranchCache(object): class Abide(object): def __init__(self): - self.admin_rules = OrderedDict() - self.tenants = OrderedDict() + self.admin_rules = {} + self.tenants = {} # tenant -> project -> list(tpcs) # The project TPCs are stored as a list as we don't check for # duplicate projects here. diff --git a/zuul/nodepool.py b/zuul/nodepool.py index b5b3048a2..0cdeb4060 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -143,10 +143,10 @@ class Nodepool(object): log.info("Canceling node request %s", request) if not request.canceled: try: + request.canceled = True self.zk_nodepool.deleteNodeRequest(request) except Exception: log.exception("Error deleting node request:") - request.canceled = True def reviseRequest(self, request, relative_priority=None): '''Attempt to update the node request, if it is not currently being diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index c0be6e9ec..dc5374034 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -330,7 +330,7 @@ class RPCListener(RPCListenerBase): def handle_tenant_list(self, job): output = [] - for tenant_name, tenant in self.sched.abide.tenants.items(): + for tenant_name, tenant in sorted(self.sched.abide.tenants.items()): queue_size = 0 for pipeline_name, pipeline in tenant.layout.pipelines.items(): for queue in pipeline.queues: diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index e5ed73900..52597a4ef 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -194,6 +194,7 @@ class ZooKeeperClient(object): for res in results: if isinstance(res, Exception): raise res + return results def getCurrentLtime(self): """Get the logical timestamp as seen by the Zookeeper cluster.""" diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index 089690564..02b002813 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -29,7 +29,7 @@ from kazoo.recipe.election import Election from zuul import model from zuul.lib.collections import DefaultKeyDict -from zuul.zk import ZooKeeperSimpleBase +from zuul.zk import ZooKeeperSimpleBase, sharding RESULT_EVENT_TYPE_MAP = { "BuildCompletedEvent": model.BuildCompletedEvent, @@ -49,7 +49,27 @@ MANAGEMENT_EVENT_TYPE_MAP = { "TenantReconfigureEvent": model.TenantReconfigureEvent, } +# /zuul/events/tenant TENANT_ROOT +# /{tenant} TENANT_NAME_ROOT +# /management TENANT_MANAGEMENT_ROOT +# /trigger TENANT_TRIGGER_ROOT +# /pipelines PIPELINE_ROOT +# /{pipeline} PIPELINE_NAME_ROOT +# /management PIPELINE_MANAGEMENT_ROOT +# /trigger PIPELINE_TRIGGER_ROOT +# /result PIPELINE_RESULT_ROOT + TENANT_ROOT = "/zuul/events/tenant" +TENANT_NAME_ROOT = TENANT_ROOT + "/{tenant}" +TENANT_MANAGEMENT_ROOT = TENANT_NAME_ROOT + "/management" +TENANT_TRIGGER_ROOT = TENANT_NAME_ROOT + "/trigger" +PIPELINE_ROOT = TENANT_NAME_ROOT + "/pipelines" +PIPELINE_NAME_ROOT = PIPELINE_ROOT + "/{pipeline}" +PIPELINE_MANAGEMENT_ROOT = PIPELINE_NAME_ROOT + "/management" +PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger" +PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result" + +EVENT_DATA_ROOT = "/zuul/events/data" CONNECTION_ROOT = "/zuul/events/connection" # This is the path to the serialized from of the event in ZK (along @@ -88,33 +108,39 @@ class EventWatcher(ZooKeeperSimpleBase): def _tenantWatch(self, tenants): for tenant_name in tenants: - tenant_path = "/".join((TENANT_ROOT, tenant_name)) - - if tenant_path in self.watched_tenants: + if tenant_name in self.watched_tenants: continue - events_path = f"{tenant_path}/events" - self.kazoo_client.ensure_path(events_path) - self.kazoo_client.ChildrenWatch( - events_path, self._eventWatch, send_event=True) + for path in (TENANT_MANAGEMENT_ROOT, + TENANT_TRIGGER_ROOT): + path = path.format(tenant=tenant_name) + self.kazoo_client.ensure_path(path) + self.kazoo_client.ChildrenWatch( + path, self._eventWatch, send_event=True) - pipelines_path = f"{tenant_path}/pipelines" + pipelines_path = PIPELINE_ROOT.format(tenant=tenant_name) self.kazoo_client.ensure_path(pipelines_path) self.kazoo_client.ChildrenWatch( pipelines_path, self._makePipelineWatcher(tenant_name)) - self.watched_tenants.add(tenant_path) + self.watched_tenants.add(tenant_name) def _pipelineWatch(self, tenant_name, pipelines): for pipeline_name in pipelines: - pipeline_path = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - if pipeline_path in self.watched_pipelines: + key = (tenant_name, pipeline_name) + if key in self.watched_pipelines: continue - self.kazoo_client.ChildrenWatch( - pipeline_path, self._eventWatch, send_event=True) - self.watched_pipelines.add(pipeline_path) + for path in (PIPELINE_MANAGEMENT_ROOT, + PIPELINE_TRIGGER_ROOT, + PIPELINE_RESULT_ROOT): + path = path.format(tenant=tenant_name, + pipeline=pipeline_name) + + self.kazoo_client.ensure_path(path) + self.kazoo_client.ChildrenWatch( + path, self._eventWatch, send_event=True) + self.watched_pipelines.add(key) def _eventWatch(self, event_list, event=None): if event is None: @@ -127,7 +153,32 @@ class EventWatcher(ZooKeeperSimpleBase): class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): - """Abstract API for events via ZooKeeper""" + """Abstract API for events via ZooKeeper + + The lifecycle of a global (not pipeline-specific) event is: + + * Serialized form of event added to ZK queue. + + * During queue processing, events are de-serialized and + AbstractEvent subclasses are instantiated. An EventAckRef is + associated with the event instance in order to maintain the link + to the serialized form. + + * When event processing is complete, the EventAckRef is used to + delete the original event. If the event requires a result + (e.g., a management event that returns data) the result will be + written to a pre-determined location. A future can watch for + the result to appear at that location. + + Pipeline specific events usually start out as global events, but + upon processing, may be forwarded to pipeline-specific queues. In + these cases, the original event will be deleted as above, and a + new, identical event will be created in the pipeline-specific + queue. If the event expects a result, no result will be written + upon forwarding; the result will only be written when the + forwarded event is processed. + + """ log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue") @@ -135,13 +186,15 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): super().__init__(client) self.event_root = event_root self.kazoo_client.ensure_path(self.event_root) + self.kazoo_client.ensure_path(EVENT_DATA_ROOT) def _listEvents(self): return self.kazoo_client.get_children(self.event_root) def __len__(self): try: - return len(self._listEvents()) + data, stat = self.kazoo_client.get(self.event_root) + return stat.children_count except NoNodeError: return 0 @@ -153,26 +206,63 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): # event object when it was de-serialized. if not event.ack_ref: raise RuntimeError("Cannot ack event %s without reference", event) - try: - self.kazoo_client.delete( - event.ack_ref.path, - version=event.ack_ref.version, - recursive=True, - ) - except NoNodeError: + if not self._remove(event.ack_ref.path, event.ack_ref.version): self.log.warning("Event %s was already acknowledged", event) - @property - def _event_create_path(self): - return f"{self.event_root}/" - def _put(self, data): - return self.kazoo_client.create( - self._event_create_path, - json.dumps(data).encode("utf-8"), - sequence=True, - makepath=True, - ) + # Within a transaction, we need something after the / in order + # to cause the sequence node to be created under the node + # before the /. So we use "seq" for that. This will produce + # paths like event_root/seq-0000. + event_path = f"{self.event_root}/seq" + + # Event data can be large, so we want to shard it. But events + # also need to be atomic (we don't want an event listener to + # start processing a half-stored event). A natural solution + # is to use a ZK transaction to write the sharded data along + # with the event. However, our events are sequence nodes in + # order to maintain ordering, and we can't use our sharding + # helper to write shards underneath a sequence node inside the + # transaction because we don't know the path of the sequence + # node within the transaction. To resolve this, we store the + # event data in two places: the event itself and associated + # metadata are in the event queue as a single sequence node. + # The event data are stored in a separate tree under a uuid. + # The event metadata includes the UUID of the data. We call + # the separated data "side channel data" to indicate it's + # stored outside the main event queue. + # + # To make the API simpler to work with, we assume "event_data" + # contains the bulk of the data. We extract it here into the + # side channel data, then in _iterEvents we re-constitute it + # into the dictionary. + + side_channel_data = None + encoded_data = json.dumps(data).encode("utf-8") + if (len(encoded_data) > sharding.NODE_BYTE_SIZE_LIMIT + and 'event_data' in data): + # Get a unique data node + data_id = str(uuid.uuid4()) + data_root = f'{EVENT_DATA_ROOT}/{data_id}' + data_path = f'{data_root}/seq' + side_channel_data = json.dumps(data['event_data']).encode("utf-8") + data = data.copy() + del data['event_data'] + data['event_data_path'] = data_root + encoded_data = json.dumps(data).encode("utf-8") + + tr = self.kazoo_client.transaction() + tr.create(data_root) + + with sharding.BufferedShardWriter(tr, data_path) as stream: + stream.write(side_channel_data) + + tr.create(event_path, encoded_data, sequence=True) + resp = self.client.commitTransaction(tr) + return resp[-1] + else: + return self.kazoo_client.create( + event_path, encoded_data, sequence=True) def _iterEvents(self): try: @@ -184,7 +274,8 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): for event_id in events: path = "/".join((self.event_root, event_id)) - # TODO: implement sharding of large events + + # Load the event metadata data, zstat = self.kazoo_client.get(path) try: event = json.loads(data) @@ -192,57 +283,54 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable): self.log.exception("Malformed event data in %s", path) self._remove(path) continue - yield event, EventAckRef(path, zstat.version), zstat - - def _remove(self, path, version=UNKNOWN_ZVERSION): - with suppress(NoNodeError): - self.kazoo_client.delete(path, version=version, recursive=True) - - -class SchedulerEventQueue(ZooKeeperEventQueue): - """Abstract API for tenant specific events via ZooKeeper - The lifecycle of a global (not pipeline-specific) event is: - - * Serialized form of event added to ZK queue. - - * During queue processing, events are de-serialized and - AbstractEvent subclasses are instantiated. An EventAckRef is - associated with the event instance in order to maintain the link - to the serialized form. - - * When event processing is complete, the EventAckRef is used to - delete the original event. If the event requires a result - (e.g., a management event that returns data) the result will be - written to a pre-determined location. A future can watch for - the result to appear at that location. + # Load the event data (if present); if that fails, the + # event is corrupt; delete and continue. + side_channel_path = event.get('event_data_path') + if side_channel_path: + try: + with sharding.BufferedShardReader( + self.kazoo_client, side_channel_path) as stream: + side_channel_data = stream.read() + except NoNodeError: + self.log.exception("Side channel data for %s " + "not found at %s", + path, side_channel_path) + self._remove(path) + continue - Pipeline specific events usually start out as global events, but - upon processing, may be forwarded to pipeline-specific queues. In - these cases, the original event will be deleted as above, and a - new, identical event will be created in the pipeline-specific - queue. If the event expects a result, no result will be written - upon forwarding; the result will only be written when the - forwarded event is processed. + try: + event_data = json.loads(side_channel_data) + except json.JSONDecodeError: + self.log.exception("Malformed side channel " + "event data in %s", + side_channel_path) + self._remove(path) + continue + event['event_data'] = event_data - """ + yield event, EventAckRef(path, zstat.version), zstat - log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue") + def _remove(self, path, version=UNKNOWN_ZVERSION): + try: + # Find the side channel path - def __init__(self, client, event_root, event_prefix): - super().__init__(client, event_root) - self.event_prefix = event_prefix + side_channel_path = None + data, zstat = self.kazoo_client.get(path) + try: + event = json.loads(data) + side_channel_path = event.get('event_data_path') + except json.JSONDecodeError: + pass - def _listEvents(self): - return [ - e - for e in self.kazoo_client.get_children(self.event_root) - if e.startswith(self.event_prefix.value) - ] + if side_channel_path: + with suppress(NoNodeError): + self.kazoo_client.delete(side_channel_path, recursive=True) - @property - def _event_create_path(self) -> str: - return "{}/{}-".format(self.event_root, self.event_prefix.value) + self.kazoo_client.delete(path, version=version, recursive=True) + return True + except NoNodeError: + return False class ManagementEventResultFuture(ZooKeeperSimpleBase): @@ -292,7 +380,7 @@ class ManagementEventResultFuture(ZooKeeperSimpleBase): return True -class ManagementEventQueue(SchedulerEventQueue): +class ManagementEventQueue(ZooKeeperEventQueue): """Management events via ZooKeeper""" RESULTS_ROOT = "/zuul/results/management" @@ -384,9 +472,10 @@ class PipelineManagementEventQueue(ManagementEventQueue): ) def __init__(self, client, tenant_name, pipeline_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - super().__init__(client, event_root, EventPrefix.MANAGEMENT) + event_root = PIPELINE_MANAGEMENT_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) + super().__init__(client, event_root) @classmethod def createRegistry(cls, client): @@ -411,8 +500,9 @@ class TenantManagementEventQueue(ManagementEventQueue): log = logging.getLogger("zuul.zk.event_queues.TenantManagementEventQueue") def __init__(self, client, tenant_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "events")) - super().__init__(client, event_root, EventPrefix.MANAGEMENT) + event_root = TENANT_MANAGEMENT_ROOT.format( + tenant=tenant_name) + super().__init__(client, event_root) def ackWithoutResult(self, event): """ @@ -437,15 +527,16 @@ class TenantManagementEventQueue(ManagementEventQueue): return DefaultKeyDict(lambda t: cls(client, t)) -class PipelineResultEventQueue(SchedulerEventQueue): +class PipelineResultEventQueue(ZooKeeperEventQueue): """Result events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue") def __init__(self, client, tenant_name, pipeline_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) - super().__init__(client, event_root, EventPrefix.RESULT) + event_root = PIPELINE_RESULT_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) + super().__init__(client, event_root) @classmethod def createRegistry(cls, client): @@ -486,14 +577,14 @@ class PipelineResultEventQueue(SchedulerEventQueue): yield event -class TriggerEventQueue(SchedulerEventQueue): +class TriggerEventQueue(ZooKeeperEventQueue): """Trigger events via ZooKeeper""" log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue") def __init__(self, client, event_root, connections): self.connections = connections - super().__init__(client, event_root, EventPrefix.TRIGGER) + super().__init__(client, event_root) def put(self, driver_name, event): data = { @@ -523,7 +614,8 @@ class TenantTriggerEventQueue(TriggerEventQueue): log = logging.getLogger("zuul.zk.event_queues.TenantTriggerEventQueue") def __init__(self, client, connections, tenant_name): - event_root = "/".join((TENANT_ROOT, tenant_name, "events")) + event_root = TENANT_TRIGGER_ROOT.format( + tenant=tenant_name) super().__init__(client, event_root, connections) @classmethod @@ -544,8 +636,9 @@ class PipelineTriggerEventQueue(TriggerEventQueue): log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue") def __init__(self, client, tenant_name, pipeline_name, connections): - event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines", - pipeline_name)) + event_root = PIPELINE_TRIGGER_ROOT.format( + tenant=tenant_name, + pipeline=pipeline_name) super().__init__(client, event_root, connections) @classmethod @@ -598,7 +691,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue): def put(self, data): self.log.debug("Submitting connection event to queue %s: %s", self.event_root, data) - self._put(data) + self._put({'event_data': data}) def __iter__(self): for data, ack_ref, _ in self._iterEvents(): @@ -606,7 +699,9 @@ class ConnectionEventQueue(ZooKeeperEventQueue): self.log.warning("Malformed event found: %s", data) self._remove(ack_ref.path) continue - event = model.ConnectionEvent.fromDict(data) + # TODO: We can assume event_data exists after 4.6.1 is released + event = model.ConnectionEvent.fromDict( + data.get('event_data', data)) event.ack_ref = ack_ref yield event |