diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-11 11:44:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:54 -0800 |
commit | b264d8f51751f9fc81cfe8e0fef0606dd877a8db (patch) | |
tree | f21e792d8d7bfd23120ede182c9965b835c317f2 | |
parent | 82b3e011fad44c92188ce7645738dea691fa5849 (diff) | |
download | kafka-python-b264d8f51751f9fc81cfe8e0fef0606dd877a8db.tar.gz |
add private methods _set_consumer_timeout_start() and _check_consumer_timeout()
-rw-r--r-- | kafka/consumer/new.py | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 5ef5921..b579bfc 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -86,7 +86,7 @@ class KafkaConsumer(object): deserializer_class=Event.from_bytes, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1, + consumer_timeout_ms=-1 Configuration parameters are described in more detail at @@ -110,7 +110,6 @@ class KafkaConsumer(object): # Currently unused 'socket_receive_buffer_bytes': 64 * 1024, - 'refresh_leader_backoff_ms': 200, 'num_consumer_fetchers': 1, 'default_fetcher_backoff_ms': 1000, 'queued_max_message_chunks': 10, @@ -213,10 +212,7 @@ class KafkaConsumer(object): return self def next(self): - consumer_timeout = False - if self._get_config('consumer_timeout_ms') >= 0: - consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0) - + self._set_consumer_timeout_start() while True: # Check for auto-commit @@ -230,8 +226,7 @@ class KafkaConsumer(object): except StopIteration: self._msg_iter = self.fetch_messages() - if consumer_timeout and time.time() > consumer_timeout: - raise ConsumerTimeout('Consumer timed out waiting to fetch messages') + self._check_consumer_timeout() def offsets(self, group=None): if not group: @@ -446,6 +441,16 @@ class KafkaConsumer(object): self.client.load_metadata_for_topics() except KafkaUnavailableError: logger.warning("Unable to refresh topic metadata... cluster unavailable") + self._check_consumer_timeout() else: logger.info("Topic metadata refreshed") return + + def _set_consumer_timeout_start(self): + self._consumer_timeout = False + if self._get_config('consumer_timeout_ms') >= 0: + self._consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0) + + def _check_consumer_timeout(self): + if self._consumer_timeout and time.time() > self._consumer_timeout: + raise ConsumerTimeout('Consumer timed out after %d ms' % + self._get_config('consumer_timeout_ms')) |