summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-11 11:44:53 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:54 -0800
commitb264d8f51751f9fc81cfe8e0fef0606dd877a8db (patch)
treef21e792d8d7bfd23120ede182c9965b835c317f2
parent82b3e011fad44c92188ce7645738dea691fa5849 (diff)
downloadkafka-python-b264d8f51751f9fc81cfe8e0fef0606dd877a8db.tar.gz
add private methods _set_consumer_timeout_start() and _check_consumer_timeout()
-rw-r--r--kafka/consumer/new.py21
1 files changed, 13 insertions, 8 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 5ef5921..b579bfc 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -86,7 +86,7 @@ class KafkaConsumer(object):
deserializer_class=Event.from_bytes,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1,
+ consumer_timeout_ms=-1
Configuration parameters are described in more detail at
@@ -110,7 +110,6 @@ class KafkaConsumer(object):
# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
- 'refresh_leader_backoff_ms': 200,
'num_consumer_fetchers': 1,
'default_fetcher_backoff_ms': 1000,
'queued_max_message_chunks': 10,
@@ -213,10 +212,7 @@ class KafkaConsumer(object):
return self
def next(self):
- consumer_timeout = False
- if self._get_config('consumer_timeout_ms') >= 0:
- consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0)
-
+ self._set_consumer_timeout_start()
while True:
# Check for auto-commit
@@ -230,8 +226,7 @@ class KafkaConsumer(object):
except StopIteration:
self._msg_iter = self.fetch_messages()
- if consumer_timeout and time.time() > consumer_timeout:
- raise ConsumerTimeout('Consumer timed out waiting to fetch messages')
+ self._check_consumer_timeout()
def offsets(self, group=None):
if not group:
@@ -446,6 +441,16 @@ class KafkaConsumer(object):
self.client.load_metadata_for_topics()
except KafkaUnavailableError:
logger.warning("Unable to refresh topic metadata... cluster unavailable")
+ self._check_consumer_timeout()
else:
logger.info("Topic metadata refreshed")
return
+
+ def _set_consumer_timeout_start(self):
+ self._consumer_timeout = False
+ if self._get_config('consumer_timeout_ms') >= 0:
+ self._consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0)
+
+ def _check_consumer_timeout(self):
+ if self._consumer_timeout and time.time() > self._consumer_timeout:
+ raise ConsumerTimeout('Consumer timed out after %d ms' % + self._get_config('consumer_timeout_ms'))