diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28da..ef8fbda 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,13 +3,16 @@ from __future__ import absolute_import from itertools import izip_longest, repeat import logging import time +import numbers from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue +import kafka from kafka.common import ( - ErrorMapping, FetchRequest, + FetchRequest, OffsetRequest, OffsetCommitRequest, + OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -80,6 +83,8 @@ class Consumer(object): if not partitions: partitions = self.client.topic_partitions[topic] + else: + assert all(isinstance(x, numbers.Integral) for x in partitions) # Variables for handling offset commits self.commit_lock = Lock() @@ -96,26 +101,22 @@ class Consumer(object): self.commit_timer.start() def get_or_init_offset_callback(resp): - if resp.error == ErrorMapping.NO_ERROR: + try: + kafka.common.check_error(resp) return resp.offset - elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + except kafka.common.UnknownTopicOrPartitionError: return 0 - else: - raise Exception("OffsetFetchRequest for topic=%s, " - "partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) - - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - for partition in partitions: - self.offsets[partition] = 0 + if auto_commit: + for partition in partitions: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset + else: + for partition in partitions: + self.offsets[partition] = 0 def commit(self, partitions=None): """ @@ -151,7 +152,7 @@ class Consumer(object): resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: - assert resp.error == 0 + kafka.common.check_error(resp) self.count_since_commit = 0 @@ -164,7 +165,7 @@ class Consumer(object): if not self.auto_commit or self.auto_commit_every_n is None: return - if self.count_since_commit > self.auto_commit_every_n: + if self.count_since_commit >= self.auto_commit_every_n: self.commit() def stop(self): @@ -429,12 +430,12 @@ class SimpleConsumer(Consumer): # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall, e: + except ConsumerFetchSizeTooSmall: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", self.max_buffer_size) - raise e + raise if self.max_buffer_size is None: self.buffer_size *= 2 else: @@ -443,7 +444,7 @@ class SimpleConsumer(Consumer): log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) - except ConsumerNoMoreData, e: + except ConsumerNoMoreData as e: log.debug("Iteration was ended by %r", e) except StopIteration: # Stop iterating through this partition |