diff options
-rw-r--r-- | kafka/consumer/new.py | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index bad1f3d..e0884d3 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -54,9 +54,8 @@ class KafkaConsumer(object): for m in kafka: print m - # Alternate interface: next() - while True: - print kafka.next() + # Alternate interface: next() + print kafka.next() # Alternate interface: batch iteration while True: @@ -79,17 +78,18 @@ class KafkaConsumer(object): kafka.task_done(m) # Alternate interface: next() - while True: - m = kafka.next() - process_message(m) - kafka.task_done(m) + m = kafka.next() + process_message(m) + kafka.task_done(m) + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() - # Batch process interface does not auto_commit! + # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) kafka.task_done(m) - kafka.commit() ``` messages (m) are namedtuples with attributes: @@ -97,7 +97,7 @@ class KafkaConsumer(object): m.partition: partition number (int) m.offset: message offset on topic-partition log (int) m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is event object) + m.value: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: @@ -110,12 +110,11 @@ class KafkaConsumer(object): metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', - deserializer_class=Event.from_bytes, + deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, consumer_timeout_ms=-1 - Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi """ @@ -137,7 +136,7 @@ class KafkaConsumer(object): metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', - deserializer_class=Event.from_bytes, + deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, auto_commit_interval_messages=None, @@ -345,9 +344,9 @@ class KafkaConsumer(object): # and send each group as a single FetchRequest to the correct broker try: responses = self._client.send_fetch_request(fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False) + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False) except FailedPayloadsError: logger.warning('FailedPayloadsError attempting to fetch data from kafka') self._refresh_metadata_on_error() @@ -537,6 +536,7 @@ class KafkaConsumer(object): # # Topic/partition management private methods # + def _consume_topic_partition(self, topic, partition): if not isinstance(topic, six.string_types): raise KafkaConfigurationError('Unknown topic type (%s) ' @@ -570,6 +570,7 @@ class KafkaConsumer(object): # # Offset-managment private methods # + def _get_commit_offsets(self): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: @@ -632,6 +633,7 @@ class KafkaConsumer(object): # # Consumer Timeout private methods # + def _set_consumer_timeout_start(self): self._consumer_timeout = False if self._config['consumer_timeout_ms'] >= 0: @@ -644,6 +646,7 @@ class KafkaConsumer(object): # # Autocommit private methods # + def _should_auto_commit(self): if not self._config['auto_commit_enable']: return False @@ -670,6 +673,7 @@ class KafkaConsumer(object): # # Message iterator private methods # + def __iter__(self): return self @@ -686,6 +690,7 @@ class KafkaConsumer(object): # # python private methods # + def __repr__(self): return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition for topic_partition in |