diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-02 13:18:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-02 13:18:10 -0800 |
commit | 24a4c2a7c5a1265899316aca86a1149496d6564e (patch) | |
tree | a917f505c3bd05284c2ac7aefbf99da04cf77503 /kafka/consumer/fetcher.py | |
parent | 976970f89acfdb3582feed613722158004b0ff3e (diff) | |
download | kafka-python-24a4c2a7c5a1265899316aca86a1149496d6564e.tar.gz |
Improve iterator interface
- Support single message consumption via next(consumer) in py2/py3
- batch message methods (Fetcher.fetched_records / KafkaConsumer.poll)
are incompatible with iterators -- message generator state keeps
messages internally after they are popped from _records, but before
subscription_state is updated.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5e15424..ddf9d6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -28,7 +28,7 @@ class RecordTooLargeError(Errors.KafkaError): pass -class Fetcher(object): +class Fetcher(six.Iterator): DEFAULT_CONFIG = { 'key_deserializer': None, 'value_deserializer': None, @@ -79,6 +79,7 @@ class Fetcher(object): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} + self._iterator = None #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) @@ -253,7 +254,7 @@ class Fetcher(object): def fetched_records(self): """Returns previously fetched records and updates consumed offsets. - NOTE: returning empty records guarantees the consumed position are NOT updated. + Incompatible with iterator interface - use one or the other, not both. Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy @@ -263,10 +264,13 @@ class Fetcher(object): configured max_partition_fetch_bytes TopicAuthorizationError: if consumer is not authorized to fetch messages from the topic + AssertionError: if used with iterator (incompatible) Returns: dict: {TopicPartition: deque([messages])} """ + assert self._iterator is None, ( + 'fetched_records is incompatible with message iterator') if self._subscriptions.needs_partition_assignment: return {} @@ -324,7 +328,7 @@ class Fetcher(object): key, value = self._deserialize(msg) yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) - def __iter__(self): + def _message_generator(self): """Iterate over fetched_records""" if self._subscriptions.needs_partition_assignment: raise StopIteration('Subscription needs partition assignment') @@ -342,7 +346,7 @@ class Fetcher(object): # this can happen when a rebalance happened before # fetched records are returned log.warning("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) + " since it is no longer assigned", tp) continue # note that the consumed position should always be available @@ -352,7 +356,7 @@ class Fetcher(object): # this can happen when a partition consumption paused before # fetched records are returned log.warning("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + " %s since it is no longer fetchable", tp) # we also need to reset the fetch positions to pretend we did # not fetch this partition in the previous request at all @@ -366,13 +370,25 @@ class Fetcher(object): # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.warning("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + tp, fetch_offset) # Send any additional FetchRequests that we can now # this will likely fetch each partition individually, rather than # fetch multiple partitions in bulk when they are on the same broker self.init_fetches() + def __iter__(self): + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable |