diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-02 13:18:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-02 13:18:10 -0800 |
commit | 24a4c2a7c5a1265899316aca86a1149496d6564e (patch) | |
tree | a917f505c3bd05284c2ac7aefbf99da04cf77503 /kafka/consumer/group.py | |
parent | 976970f89acfdb3582feed613722158004b0ff3e (diff) | |
download | kafka-python-24a4c2a7c5a1265899316aca86a1149496d6564e.tar.gz |
Improve iterator interface
- Support single message consumption via next(consumer) in py2/py3
- batch message methods (Fetcher.fetched_records / KafkaConsumer.poll)
are incompatible with iterators -- message generator state keeps
messages internally after they are popped from _records, but before
subscription_state is updated.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5278214..cea2e1c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -4,6 +4,8 @@ import copy import logging import time +import six + from kafka.client_async import KafkaClient from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState @@ -15,7 +17,7 @@ from kafka.version import __version__ log = logging.getLogger(__name__) -class KafkaConsumer(object): +class KafkaConsumer(six.Iterator): """Consumer for Kafka 0.9""" DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', @@ -160,6 +162,7 @@ class KafkaConsumer(object): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False + self._iterator = None #self.metrics = None if topics: @@ -324,16 +327,16 @@ class KafkaConsumer(object): return self._client.cluster.partitions_for_topic(topic) def poll(self, timeout_ms=0): - """ - Fetch data for the topics or partitions specified using one of the - subscribe/assign APIs. It is an error to not have subscribed to any - topics or partitions before polling for data. + """Fetch data from assigned topics / partitions. + Records are fetched and returned in batches by topic-partition. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(partition, offset) or automatically set as the last committed offset for the subscribed list of partitions. + Incompatible with iterator interface -- use one or the other, not both. + Arguments: timeout_ms (int, optional): milliseconds to spend waiting in poll if data is not available. If 0, returns immediately with any @@ -344,6 +347,7 @@ class KafkaConsumer(object): subscribed list of topics and partitions """ assert timeout_ms >= 0, 'Timeout must not be negative' + assert self._iterator is None, 'Incompatible with iterator interface' # poll for new data until the timeout expires start = time.time() @@ -564,7 +568,7 @@ class KafkaConsumer(object): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) - def __iter__(self): + def _message_generator(self): while True: self._coordinator.ensure_coordinator_known() @@ -585,3 +589,15 @@ class KafkaConsumer(object): yield msg if time.time() > timeout: break + + def __iter__(self): + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise |