summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/unit/test_event_queues.py40
-rw-r--r--zuul/configloader.py10
-rw-r--r--zuul/merger/server.py21
-rw-r--r--zuul/model.py4
-rw-r--r--zuul/nodepool.py2
-rw-r--r--zuul/rpclistener.py2
-rw-r--r--zuul/zk/__init__.py1
-rw-r--r--zuul/zk/event_queues.py289
8 files changed, 261 insertions, 108 deletions
diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py
index 61c7ba4bf..01a1d160a 100644
--- a/tests/unit/test_event_queues.py
+++ b/tests/unit/test_event_queues.py
@@ -19,7 +19,7 @@ import testtools
from zuul import model
from zuul.driver import Driver, TriggerInterface
from zuul.lib.connections import ConnectionRegistry
-from zuul.zk import ZooKeeperClient, event_queues
+from zuul.zk import ZooKeeperClient, event_queues, sharding
from tests.base import BaseTestCase, iterate_timeout
@@ -128,6 +128,44 @@ class TestTriggerEventQueue(EventQueueBaseTestCase):
self.driver = DummyDriver()
self.connections.registerDriver(self.driver)
+ def test_sharded_tenant_trigger_events(self):
+ # Test enqueue/dequeue of the tenant trigger event queue.
+ queue = event_queues.TenantTriggerEventQueue(
+ self.zk_client, self.connections, "tenant"
+ )
+
+ self.assertEqual(len(queue), 0)
+ self.assertFalse(queue.hasEvents())
+
+ event = DummyTriggerEvent()
+ data = {'test': "x" * (sharding.NODE_BYTE_SIZE_LIMIT + 1)}
+ event.data = data
+
+ queue.put(self.driver.driver_name, event)
+ queue.put(self.driver.driver_name, event)
+
+ self.assertEqual(len(queue), 2)
+ self.assertTrue(queue.hasEvents())
+
+ processed = 0
+ for event in queue:
+ self.assertIsInstance(event, DummyTriggerEvent)
+ processed += 1
+
+ self.assertEqual(processed, 2)
+ self.assertEqual(len(queue), 2)
+ self.assertTrue(queue.hasEvents())
+
+ acked = 0
+ for event in queue:
+ queue.ack(event)
+ self.assertEqual(event.data, data)
+ acked += 1
+
+ self.assertEqual(acked, 2)
+ self.assertEqual(len(queue), 0)
+ self.assertFalse(queue.hasEvents())
+
def test_tenant_trigger_events(self):
# Test enqueue/dequeue of the tenant trigger event queue.
queue = event_queues.TenantTriggerEventQueue(
diff --git a/zuul/configloader.py b/zuul/configloader.py
index b56d3ca70..cb589be17 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -2347,13 +2347,19 @@ class ConfigLoader(object):
"""
if tenant_name not in unparsed_abide.tenants:
- del abide.tenants[tenant_name]
+ # Copy tenants dictionary to not break concurrent iterations.
+ tenants = abide.tenants.copy()
+ del tenants[tenant_name]
+ abide.tenants = tenants
return None
unparsed_config = unparsed_abide.tenants[tenant_name]
new_tenant = self.tenant_parser.fromYaml(
abide, unparsed_config, ansible_manager, min_ltimes)
- abide.tenants[tenant_name] = new_tenant
+ # Copy tenants dictionary to not break concurrent iterations.
+ tenants = abide.tenants.copy()
+ tenants[tenant_name] = new_tenant
+ abide.tenants = tenants
if len(new_tenant.layout.loading_errors):
self.log.warning(
"%s errors detected during %s tenant configuration loading",
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 6b4fd14ca..e5a81e457 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -16,6 +16,7 @@ import json
import logging
import os
import socket
+import sys
import threading
from abc import ABCMeta
from configparser import ConfigParser
@@ -188,7 +189,10 @@ class BaseMergeServer(metaclass=ABCMeta):
else:
result = dict(updated=True, files=files)
- job.sendWorkComplete(json.dumps(result))
+ payload = json.dumps(result)
+ self.log.debug("Completed cat job %s: payload size: %s",
+ job.unique, sys.getsizeof(payload))
+ job.sendWorkComplete(payload)
def merge(self, job):
self.log.debug("Got merge job: %s" % job.unique)
@@ -212,7 +216,10 @@ class BaseMergeServer(metaclass=ABCMeta):
(result['commit'], result['files'], result['repo_state'],
recent, orig_commit) = ret
result['zuul_event_id'] = zuul_event_id
- job.sendWorkComplete(json.dumps(result))
+ payload = json.dumps(result)
+ self.log.debug("Completed merge job %s: payload size: %s",
+ job.unique, sys.getsizeof(payload))
+ job.sendWorkComplete(payload)
def refstate(self, job):
self.log.debug("Got refstate job: %s" % job.unique)
@@ -225,7 +232,10 @@ class BaseMergeServer(metaclass=ABCMeta):
repo_state=repo_state,
item_in_branches=item_in_branches)
result['zuul_event_id'] = zuul_event_id
- job.sendWorkComplete(json.dumps(result))
+ payload = json.dumps(result)
+ self.log.debug("Completed refstate job %s: payload size: %s",
+ job.unique, sys.getsizeof(payload))
+ job.sendWorkComplete(payload)
def fileschanges(self, job):
self.log.debug("Got fileschanges job: %s" % job.unique)
@@ -250,7 +260,10 @@ class BaseMergeServer(metaclass=ABCMeta):
result = dict(updated=True, files=files)
result['zuul_event_id'] = zuul_event_id
- job.sendWorkComplete(json.dumps(result))
+ payload = json.dumps(result)
+ self.log.debug("Completed fileschanges job %s: payload size: %s",
+ job.unique, sys.getsizeof(payload))
+ job.sendWorkComplete(payload)
class MergeServer(BaseMergeServer):
diff --git a/zuul/model.py b/zuul/model.py
index 5d071cde2..13c69c8b6 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -5436,8 +5436,8 @@ class UnparsedBranchCache(object):
class Abide(object):
def __init__(self):
- self.admin_rules = OrderedDict()
- self.tenants = OrderedDict()
+ self.admin_rules = {}
+ self.tenants = {}
# tenant -> project -> list(tpcs)
# The project TPCs are stored as a list as we don't check for
# duplicate projects here.
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index b5b3048a2..0cdeb4060 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -143,10 +143,10 @@ class Nodepool(object):
log.info("Canceling node request %s", request)
if not request.canceled:
try:
+ request.canceled = True
self.zk_nodepool.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
- request.canceled = True
def reviseRequest(self, request, relative_priority=None):
'''Attempt to update the node request, if it is not currently being
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index c0be6e9ec..dc5374034 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -330,7 +330,7 @@ class RPCListener(RPCListenerBase):
def handle_tenant_list(self, job):
output = []
- for tenant_name, tenant in self.sched.abide.tenants.items():
+ for tenant_name, tenant in sorted(self.sched.abide.tenants.items()):
queue_size = 0
for pipeline_name, pipeline in tenant.layout.pipelines.items():
for queue in pipeline.queues:
diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py
index e5ed73900..52597a4ef 100644
--- a/zuul/zk/__init__.py
+++ b/zuul/zk/__init__.py
@@ -194,6 +194,7 @@ class ZooKeeperClient(object):
for res in results:
if isinstance(res, Exception):
raise res
+ return results
def getCurrentLtime(self):
"""Get the logical timestamp as seen by the Zookeeper cluster."""
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index 089690564..02b002813 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -29,7 +29,7 @@ from kazoo.recipe.election import Election
from zuul import model
from zuul.lib.collections import DefaultKeyDict
-from zuul.zk import ZooKeeperSimpleBase
+from zuul.zk import ZooKeeperSimpleBase, sharding
RESULT_EVENT_TYPE_MAP = {
"BuildCompletedEvent": model.BuildCompletedEvent,
@@ -49,7 +49,27 @@ MANAGEMENT_EVENT_TYPE_MAP = {
"TenantReconfigureEvent": model.TenantReconfigureEvent,
}
+# /zuul/events/tenant TENANT_ROOT
+# /{tenant} TENANT_NAME_ROOT
+# /management TENANT_MANAGEMENT_ROOT
+# /trigger TENANT_TRIGGER_ROOT
+# /pipelines PIPELINE_ROOT
+# /{pipeline} PIPELINE_NAME_ROOT
+# /management PIPELINE_MANAGEMENT_ROOT
+# /trigger PIPELINE_TRIGGER_ROOT
+# /result PIPELINE_RESULT_ROOT
+
TENANT_ROOT = "/zuul/events/tenant"
+TENANT_NAME_ROOT = TENANT_ROOT + "/{tenant}"
+TENANT_MANAGEMENT_ROOT = TENANT_NAME_ROOT + "/management"
+TENANT_TRIGGER_ROOT = TENANT_NAME_ROOT + "/trigger"
+PIPELINE_ROOT = TENANT_NAME_ROOT + "/pipelines"
+PIPELINE_NAME_ROOT = PIPELINE_ROOT + "/{pipeline}"
+PIPELINE_MANAGEMENT_ROOT = PIPELINE_NAME_ROOT + "/management"
+PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger"
+PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result"
+
+EVENT_DATA_ROOT = "/zuul/events/data"
CONNECTION_ROOT = "/zuul/events/connection"
# This is the path to the serialized from of the event in ZK (along
@@ -88,33 +108,39 @@ class EventWatcher(ZooKeeperSimpleBase):
def _tenantWatch(self, tenants):
for tenant_name in tenants:
- tenant_path = "/".join((TENANT_ROOT, tenant_name))
-
- if tenant_path in self.watched_tenants:
+ if tenant_name in self.watched_tenants:
continue
- events_path = f"{tenant_path}/events"
- self.kazoo_client.ensure_path(events_path)
- self.kazoo_client.ChildrenWatch(
- events_path, self._eventWatch, send_event=True)
+ for path in (TENANT_MANAGEMENT_ROOT,
+ TENANT_TRIGGER_ROOT):
+ path = path.format(tenant=tenant_name)
+ self.kazoo_client.ensure_path(path)
+ self.kazoo_client.ChildrenWatch(
+ path, self._eventWatch, send_event=True)
- pipelines_path = f"{tenant_path}/pipelines"
+ pipelines_path = PIPELINE_ROOT.format(tenant=tenant_name)
self.kazoo_client.ensure_path(pipelines_path)
self.kazoo_client.ChildrenWatch(
pipelines_path, self._makePipelineWatcher(tenant_name))
- self.watched_tenants.add(tenant_path)
+ self.watched_tenants.add(tenant_name)
def _pipelineWatch(self, tenant_name, pipelines):
for pipeline_name in pipelines:
- pipeline_path = "/".join((TENANT_ROOT, tenant_name, "pipelines",
- pipeline_name))
- if pipeline_path in self.watched_pipelines:
+ key = (tenant_name, pipeline_name)
+ if key in self.watched_pipelines:
continue
- self.kazoo_client.ChildrenWatch(
- pipeline_path, self._eventWatch, send_event=True)
- self.watched_pipelines.add(pipeline_path)
+ for path in (PIPELINE_MANAGEMENT_ROOT,
+ PIPELINE_TRIGGER_ROOT,
+ PIPELINE_RESULT_ROOT):
+ path = path.format(tenant=tenant_name,
+ pipeline=pipeline_name)
+
+ self.kazoo_client.ensure_path(path)
+ self.kazoo_client.ChildrenWatch(
+ path, self._eventWatch, send_event=True)
+ self.watched_pipelines.add(key)
def _eventWatch(self, event_list, event=None):
if event is None:
@@ -127,7 +153,32 @@ class EventWatcher(ZooKeeperSimpleBase):
class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
- """Abstract API for events via ZooKeeper"""
+ """Abstract API for events via ZooKeeper
+
+ The lifecycle of a global (not pipeline-specific) event is:
+
+ * Serialized form of event added to ZK queue.
+
+ * During queue processing, events are de-serialized and
+ AbstractEvent subclasses are instantiated. An EventAckRef is
+ associated with the event instance in order to maintain the link
+ to the serialized form.
+
+ * When event processing is complete, the EventAckRef is used to
+ delete the original event. If the event requires a result
+ (e.g., a management event that returns data) the result will be
+ written to a pre-determined location. A future can watch for
+ the result to appear at that location.
+
+ Pipeline specific events usually start out as global events, but
+ upon processing, may be forwarded to pipeline-specific queues. In
+ these cases, the original event will be deleted as above, and a
+ new, identical event will be created in the pipeline-specific
+ queue. If the event expects a result, no result will be written
+ upon forwarding; the result will only be written when the
+ forwarded event is processed.
+
+ """
log = logging.getLogger("zuul.zk.event_queues.ZooKeeperEventQueue")
@@ -135,13 +186,15 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
super().__init__(client)
self.event_root = event_root
self.kazoo_client.ensure_path(self.event_root)
+ self.kazoo_client.ensure_path(EVENT_DATA_ROOT)
def _listEvents(self):
return self.kazoo_client.get_children(self.event_root)
def __len__(self):
try:
- return len(self._listEvents())
+ data, stat = self.kazoo_client.get(self.event_root)
+ return stat.children_count
except NoNodeError:
return 0
@@ -153,26 +206,63 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
# event object when it was de-serialized.
if not event.ack_ref:
raise RuntimeError("Cannot ack event %s without reference", event)
- try:
- self.kazoo_client.delete(
- event.ack_ref.path,
- version=event.ack_ref.version,
- recursive=True,
- )
- except NoNodeError:
+ if not self._remove(event.ack_ref.path, event.ack_ref.version):
self.log.warning("Event %s was already acknowledged", event)
- @property
- def _event_create_path(self):
- return f"{self.event_root}/"
-
def _put(self, data):
- return self.kazoo_client.create(
- self._event_create_path,
- json.dumps(data).encode("utf-8"),
- sequence=True,
- makepath=True,
- )
+ # Within a transaction, we need something after the / in order
+ # to cause the sequence node to be created under the node
+ # before the /. So we use "seq" for that. This will produce
+ # paths like event_root/seq-0000.
+ event_path = f"{self.event_root}/seq"
+
+ # Event data can be large, so we want to shard it. But events
+ # also need to be atomic (we don't want an event listener to
+ # start processing a half-stored event). A natural solution
+ # is to use a ZK transaction to write the sharded data along
+ # with the event. However, our events are sequence nodes in
+ # order to maintain ordering, and we can't use our sharding
+ # helper to write shards underneath a sequence node inside the
+ # transaction because we don't know the path of the sequence
+ # node within the transaction. To resolve this, we store the
+ # event data in two places: the event itself and associated
+ # metadata are in the event queue as a single sequence node.
+ # The event data are stored in a separate tree under a uuid.
+ # The event metadata includes the UUID of the data. We call
+ # the separated data "side channel data" to indicate it's
+ # stored outside the main event queue.
+ #
+ # To make the API simpler to work with, we assume "event_data"
+ # contains the bulk of the data. We extract it here into the
+ # side channel data, then in _iterEvents we re-constitute it
+ # into the dictionary.
+
+ side_channel_data = None
+ encoded_data = json.dumps(data).encode("utf-8")
+ if (len(encoded_data) > sharding.NODE_BYTE_SIZE_LIMIT
+ and 'event_data' in data):
+ # Get a unique data node
+ data_id = str(uuid.uuid4())
+ data_root = f'{EVENT_DATA_ROOT}/{data_id}'
+ data_path = f'{data_root}/seq'
+ side_channel_data = json.dumps(data['event_data']).encode("utf-8")
+ data = data.copy()
+ del data['event_data']
+ data['event_data_path'] = data_root
+ encoded_data = json.dumps(data).encode("utf-8")
+
+ tr = self.kazoo_client.transaction()
+ tr.create(data_root)
+
+ with sharding.BufferedShardWriter(tr, data_path) as stream:
+ stream.write(side_channel_data)
+
+ tr.create(event_path, encoded_data, sequence=True)
+ resp = self.client.commitTransaction(tr)
+ return resp[-1]
+ else:
+ return self.kazoo_client.create(
+ event_path, encoded_data, sequence=True)
def _iterEvents(self):
try:
@@ -184,7 +274,8 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
for event_id in events:
path = "/".join((self.event_root, event_id))
- # TODO: implement sharding of large events
+
+ # Load the event metadata
data, zstat = self.kazoo_client.get(path)
try:
event = json.loads(data)
@@ -192,57 +283,54 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.log.exception("Malformed event data in %s", path)
self._remove(path)
continue
- yield event, EventAckRef(path, zstat.version), zstat
-
- def _remove(self, path, version=UNKNOWN_ZVERSION):
- with suppress(NoNodeError):
- self.kazoo_client.delete(path, version=version, recursive=True)
-
-
-class SchedulerEventQueue(ZooKeeperEventQueue):
- """Abstract API for tenant specific events via ZooKeeper
- The lifecycle of a global (not pipeline-specific) event is:
-
- * Serialized form of event added to ZK queue.
-
- * During queue processing, events are de-serialized and
- AbstractEvent subclasses are instantiated. An EventAckRef is
- associated with the event instance in order to maintain the link
- to the serialized form.
-
- * When event processing is complete, the EventAckRef is used to
- delete the original event. If the event requires a result
- (e.g., a management event that returns data) the result will be
- written to a pre-determined location. A future can watch for
- the result to appear at that location.
+ # Load the event data (if present); if that fails, the
+ # event is corrupt; delete and continue.
+ side_channel_path = event.get('event_data_path')
+ if side_channel_path:
+ try:
+ with sharding.BufferedShardReader(
+ self.kazoo_client, side_channel_path) as stream:
+ side_channel_data = stream.read()
+ except NoNodeError:
+ self.log.exception("Side channel data for %s "
+ "not found at %s",
+ path, side_channel_path)
+ self._remove(path)
+ continue
- Pipeline specific events usually start out as global events, but
- upon processing, may be forwarded to pipeline-specific queues. In
- these cases, the original event will be deleted as above, and a
- new, identical event will be created in the pipeline-specific
- queue. If the event expects a result, no result will be written
- upon forwarding; the result will only be written when the
- forwarded event is processed.
+ try:
+ event_data = json.loads(side_channel_data)
+ except json.JSONDecodeError:
+ self.log.exception("Malformed side channel "
+ "event data in %s",
+ side_channel_path)
+ self._remove(path)
+ continue
+ event['event_data'] = event_data
- """
+ yield event, EventAckRef(path, zstat.version), zstat
- log = logging.getLogger("zuul.zk.event_queues.SchedulerEventQueue")
+ def _remove(self, path, version=UNKNOWN_ZVERSION):
+ try:
+ # Find the side channel path
- def __init__(self, client, event_root, event_prefix):
- super().__init__(client, event_root)
- self.event_prefix = event_prefix
+ side_channel_path = None
+ data, zstat = self.kazoo_client.get(path)
+ try:
+ event = json.loads(data)
+ side_channel_path = event.get('event_data_path')
+ except json.JSONDecodeError:
+ pass
- def _listEvents(self):
- return [
- e
- for e in self.kazoo_client.get_children(self.event_root)
- if e.startswith(self.event_prefix.value)
- ]
+ if side_channel_path:
+ with suppress(NoNodeError):
+ self.kazoo_client.delete(side_channel_path, recursive=True)
- @property
- def _event_create_path(self) -> str:
- return "{}/{}-".format(self.event_root, self.event_prefix.value)
+ self.kazoo_client.delete(path, version=version, recursive=True)
+ return True
+ except NoNodeError:
+ return False
class ManagementEventResultFuture(ZooKeeperSimpleBase):
@@ -292,7 +380,7 @@ class ManagementEventResultFuture(ZooKeeperSimpleBase):
return True
-class ManagementEventQueue(SchedulerEventQueue):
+class ManagementEventQueue(ZooKeeperEventQueue):
"""Management events via ZooKeeper"""
RESULTS_ROOT = "/zuul/results/management"
@@ -384,9 +472,10 @@ class PipelineManagementEventQueue(ManagementEventQueue):
)
def __init__(self, client, tenant_name, pipeline_name):
- event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
- pipeline_name))
- super().__init__(client, event_root, EventPrefix.MANAGEMENT)
+ event_root = PIPELINE_MANAGEMENT_ROOT.format(
+ tenant=tenant_name,
+ pipeline=pipeline_name)
+ super().__init__(client, event_root)
@classmethod
def createRegistry(cls, client):
@@ -411,8 +500,9 @@ class TenantManagementEventQueue(ManagementEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantManagementEventQueue")
def __init__(self, client, tenant_name):
- event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
- super().__init__(client, event_root, EventPrefix.MANAGEMENT)
+ event_root = TENANT_MANAGEMENT_ROOT.format(
+ tenant=tenant_name)
+ super().__init__(client, event_root)
def ackWithoutResult(self, event):
"""
@@ -437,15 +527,16 @@ class TenantManagementEventQueue(ManagementEventQueue):
return DefaultKeyDict(lambda t: cls(client, t))
-class PipelineResultEventQueue(SchedulerEventQueue):
+class PipelineResultEventQueue(ZooKeeperEventQueue):
"""Result events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.PipelineResultEventQueue")
def __init__(self, client, tenant_name, pipeline_name):
- event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
- pipeline_name))
- super().__init__(client, event_root, EventPrefix.RESULT)
+ event_root = PIPELINE_RESULT_ROOT.format(
+ tenant=tenant_name,
+ pipeline=pipeline_name)
+ super().__init__(client, event_root)
@classmethod
def createRegistry(cls, client):
@@ -486,14 +577,14 @@ class PipelineResultEventQueue(SchedulerEventQueue):
yield event
-class TriggerEventQueue(SchedulerEventQueue):
+class TriggerEventQueue(ZooKeeperEventQueue):
"""Trigger events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.TriggerEventQueue")
def __init__(self, client, event_root, connections):
self.connections = connections
- super().__init__(client, event_root, EventPrefix.TRIGGER)
+ super().__init__(client, event_root)
def put(self, driver_name, event):
data = {
@@ -523,7 +614,8 @@ class TenantTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.TenantTriggerEventQueue")
def __init__(self, client, connections, tenant_name):
- event_root = "/".join((TENANT_ROOT, tenant_name, "events"))
+ event_root = TENANT_TRIGGER_ROOT.format(
+ tenant=tenant_name)
super().__init__(client, event_root, connections)
@classmethod
@@ -544,8 +636,9 @@ class PipelineTriggerEventQueue(TriggerEventQueue):
log = logging.getLogger("zuul.zk.event_queues.PipelineTriggerEventQueue")
def __init__(self, client, tenant_name, pipeline_name, connections):
- event_root = "/".join((TENANT_ROOT, tenant_name, "pipelines",
- pipeline_name))
+ event_root = PIPELINE_TRIGGER_ROOT.format(
+ tenant=tenant_name,
+ pipeline=pipeline_name)
super().__init__(client, event_root, connections)
@classmethod
@@ -598,7 +691,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
def put(self, data):
self.log.debug("Submitting connection event to queue %s: %s",
self.event_root, data)
- self._put(data)
+ self._put({'event_data': data})
def __iter__(self):
for data, ack_ref, _ in self._iterEvents():
@@ -606,7 +699,9 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path)
continue
- event = model.ConnectionEvent.fromDict(data)
+ # TODO: We can assume event_data exists after 4.6.1 is released
+ event = model.ConnectionEvent.fromDict(
+ data.get('event_data', data))
event.ack_ref = ack_ref
yield event