summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-02 13:18:10 -0800
committerDana Powers <dana.powers@rd.io>2016-01-02 13:18:10 -0800
commit24a4c2a7c5a1265899316aca86a1149496d6564e (patch)
treea917f505c3bd05284c2ac7aefbf99da04cf77503 /kafka/consumer/fetcher.py
parent976970f89acfdb3582feed613722158004b0ff3e (diff)
downloadkafka-python-24a4c2a7c5a1265899316aca86a1149496d6564e.tar.gz
Improve iterator interface
- Support single message consumption via next(consumer) in py2/py3 - batch message methods (Fetcher.fetched_records / KafkaConsumer.poll) are incompatible with iterators -- message generator state keeps messages internally after they are popped from _records, but before subscription_state is updated.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 5e15424..ddf9d6f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -28,7 +28,7 @@ class RecordTooLargeError(Errors.KafkaError):
pass
-class Fetcher(object):
+class Fetcher(six.Iterator):
DEFAULT_CONFIG = {
'key_deserializer': None,
'value_deserializer': None,
@@ -79,6 +79,7 @@ class Fetcher(object):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._iterator = None
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
@@ -253,7 +254,7 @@ class Fetcher(object):
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets.
- NOTE: returning empty records guarantees the consumed position are NOT updated.
+ Incompatible with iterator interface - use one or the other, not both.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -263,10 +264,13 @@ class Fetcher(object):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
+ AssertionError: if used with iterator (incompatible)
Returns:
dict: {TopicPartition: deque([messages])}
"""
+ assert self._iterator is None, (
+ 'fetched_records is incompatible with message iterator')
if self._subscriptions.needs_partition_assignment:
return {}
@@ -324,7 +328,7 @@ class Fetcher(object):
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
- def __iter__(self):
+ def _message_generator(self):
"""Iterate over fetched_records"""
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
@@ -342,7 +346,7 @@ class Fetcher(object):
# this can happen when a rebalance happened before
# fetched records are returned
log.warning("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
+ " since it is no longer assigned", tp)
continue
# note that the consumed position should always be available
@@ -352,7 +356,7 @@ class Fetcher(object):
# this can happen when a partition consumption paused before
# fetched records are returned
log.warning("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
+ " %s since it is no longer fetchable", tp)
# we also need to reset the fetch positions to pretend we did
# not fetch this partition in the previous request at all
@@ -366,13 +370,25 @@ class Fetcher(object):
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.warning("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
+ tp, fetch_offset)
# Send any additional FetchRequests that we can now
# this will likely fetch each partition individually, rather than
# fetch multiple partitions in bulk when they are on the same broker
self.init_fetches()
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
def _deserialize(self, msg):
if self.config['key_deserializer']:
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable