summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py47
1 files changed, 24 insertions, 23 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 8ac28da..ef8fbda 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,13 +3,16 @@ from __future__ import absolute_import
from itertools import izip_longest, repeat
import logging
import time
+import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
+import kafka
from kafka.common import (
- ErrorMapping, FetchRequest,
+ FetchRequest,
OffsetRequest, OffsetCommitRequest,
+ OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -80,6 +83,8 @@ class Consumer(object):
if not partitions:
partitions = self.client.topic_partitions[topic]
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
# Variables for handling offset commits
self.commit_lock = Lock()
@@ -96,26 +101,22 @@ class Consumer(object):
self.commit_timer.start()
def get_or_init_offset_callback(resp):
- if resp.error == ErrorMapping.NO_ERROR:
+ try:
+ kafka.common.check_error(resp)
return resp.offset
- elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ except kafka.common.UnknownTopicOrPartitionError:
return 0
- else:
- raise Exception("OffsetFetchRequest for topic=%s, "
- "partition=%d failed with errorcode=%s" % (
- resp.topic, resp.partition, resp.error))
-
- # Uncomment for 0.8.1
- #
- #for partition in partitions:
- # req = OffsetFetchRequest(topic, partition)
- # (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback,
- # fail_on_error=False)
- # self.offsets[partition] = offset
- for partition in partitions:
- self.offsets[partition] = 0
+ if auto_commit:
+ for partition in partitions:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
def commit(self, partitions=None):
"""
@@ -151,7 +152,7 @@ class Consumer(object):
resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
- assert resp.error == 0
+ kafka.common.check_error(resp)
self.count_since_commit = 0
@@ -164,7 +165,7 @@ class Consumer(object):
if not self.auto_commit or self.auto_commit_every_n is None:
return
- if self.count_since_commit > self.auto_commit_every_n:
+ if self.count_since_commit >= self.auto_commit_every_n:
self.commit()
def stop(self):
@@ -429,12 +430,12 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall, e:
+ except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
- raise e
+ raise
if self.max_buffer_size is None:
self.buffer_size *= 2
else:
@@ -443,7 +444,7 @@ class SimpleConsumer(Consumer):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
- except ConsumerNoMoreData, e:
+ except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition