diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-31 10:29:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-31 15:26:01 -0700 |
commit | 47989db113ff1603b081867f3914e0c0828dfc9c (patch) | |
tree | 82c9196a65b161a54538a84fe4af887c81e13497 /kafka/consumer/base.py | |
parent | 9fd08119170b64c56ea024d12ef6b0e6482d778b (diff) | |
download | kafka-python-47989db113ff1603b081867f3914e0c0828dfc9c.tar.gz |
Register atexit handlers for consumer and producer thread/multiprocess cleanup (not __del__)
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r-- | kafka/consumer/base.py | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0bbf46c..64d96ea 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import atexit import logging import numbers from threading import Lock @@ -75,6 +76,11 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 + # Register a cleanup handler + def cleanup(obj): + obj.stop() + self._cleanup_func = cleanup + atexit.register(cleanup, self) def fetch_last_known_offsets(self, partitions=None): if self.group is None: @@ -157,14 +163,30 @@ class Consumer(object): if self.count_since_commit >= self.auto_commit_every_n: self.commit() - def __del__(self): - self.stop() - def stop(self): if self.commit_timer is not None: self.commit_timer.stop() self.commit() + if hasattr(self, '_cleanup_func'): + # Remove cleanup handler now that we've stopped + + # py3 supports unregistering + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup_func) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer + # exists is fine here + try: + atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + except ValueError: + pass + + del self._cleanup_func + def pending(self, partitions=None): """ Gets the pending message count |