diff options
author | aisch <me+bosch@aitmp.com> | 2016-02-16 21:30:38 -0800 |
---|---|---|
committer | aisch <me+bosch@aitmp.com> | 2016-02-16 21:30:38 -0800 |
commit | d7522b0fb79bffbe10a2548658a48829dd1a5c33 (patch) | |
tree | 53b9e5f662a0f583f82513c6b4f6e549f7088478 /kafka/coordinator/base.py | |
parent | 9f0db5d38b444f5a93da7bed4a19114aff8701e8 (diff) | |
download | kafka-python-d7522b0fb79bffbe10a2548658a48829dd1a5c33.tar.gz |
break up some circular references and close client wake pipe on __del__
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 6efdfd0..c49c38b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -2,6 +2,7 @@ import abc import copy import logging import time +import weakref import six @@ -85,9 +86,12 @@ class BaseCoordinator(object): self.rejoin_needed = True self.needs_join_prepare = True self.heartbeat = Heartbeat(**self.config) - self.heartbeat_task = HeartbeatTask(self) + self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + self.heartbeat_task.disable() + @abc.abstractmethod def protocol_type(self): """ @@ -572,6 +576,12 @@ class HeartbeatTask(object): self._client = coordinator._client self._request_in_flight = False + def disable(self): + try: + self._client.unschedule(self) + except KeyError: + pass + def reset(self): # start or restart the heartbeat task to be executed at the next chance self._heartbeat.reset_session_timeout() |