summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py93
1 files changed, 59 insertions, 34 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index d09803a..3b64571 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -16,6 +16,7 @@ log = logging.getLogger("kafka")
AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000
+
class SimpleConsumer(object):
"""
A simple consumer implementation that consumes all partitions for a topic
@@ -25,13 +26,16 @@ class SimpleConsumer(object):
topic: the topic to consume
auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another
- when one is triggered. These triggers simply call the commit method on this class. A
- manual call to commit will also reset these triggers
+ If both auto_commit_every_n and auto_commit_every_t are set, they will
+ reset one another when one is triggered. These triggers simply call the
+ commit method on this class. A manual call to commit will also reset
+ these triggers
"""
def __init__(self, client, group, topic, auto_commit=True,
@@ -63,17 +67,19 @@ class SimpleConsumer(object):
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
return 0
else:
- raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
- resp.topic, resp.partition, resp.error))
+ raise Exception("OffsetFetchRequest for topic=%s, "
+ "partition=%d failed with errorcode=%s" % (
+ resp.topic, resp.partition, resp.error))
# Uncomment for 0.8.1
#
#for partition in self.client.topic_partitions[topic]:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback, fail_on_error=False)
+ # callback=get_or_init_offset_callback,
+ # fail_on_error=False)
# self.offsets[partition] = offset
-
+
for partition in self.client.topic_partitions[topic]:
self.offsets[partition] = 0
@@ -87,14 +93,16 @@ class SimpleConsumer(object):
1 is relative to the current offset
2 is relative to the latest known offset (tail)
"""
- if whence == 1: # relative to current position
+ if whence == 1: # relative to current position
for partition, _offset in self.offsets.items():
self.offsets[partition] = _offset + offset
- elif whence in (0, 2): # relative to beginning or end
- # divide the request offset by number of partitions, distribute the remained evenly
+ elif whence in (0, 2): # relative to beginning or end
+ # divide the request offset by number of partitions,
+ # distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
deltas = {}
- for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0):
+ for partition, r in izip_longest(self.offsets.keys(),
+ repeat(1, rem), fillvalue=0):
deltas[partition] = delta + r
reqs = []
@@ -108,7 +116,8 @@ class SimpleConsumer(object):
resps = self.client.send_offset_request(reqs)
for resp in resps:
- self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition]
+ self.offsets[resp.partition] = resp.offsets[0] + \
+ deltas[resp.partition]
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
@@ -149,24 +158,24 @@ class SimpleConsumer(object):
"""
Commit offsets for this consumer
- partitions: list of partitions to commit, default is to commit all of them
+ partitions: list of partitions to commit, default is to commit
+ all of them
"""
-
# short circuit if nothing happened
if self.count_since_commit == 0:
return
with self.commit_lock:
reqs = []
- if len(partitions) == 0: # commit all partitions
+ if len(partitions) == 0: # commit all partitions
partitions = self.offsets.keys()
for partition in partitions:
offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
+ (offset, self.group, self.topic, partition))
reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))
@@ -177,10 +186,27 @@ class SimpleConsumer(object):
self.count_since_commit = 0
+ def _auto_commit(self):
+ """
+ Check if we have to commit based on number of messages and commit
+ """
+
+ # Check if we are supposed to do an auto-commit
+ if not self.auto_commit or self.auto_commit_every_n is None:
+ return
+
+ if self.count_since_commit > self.auto_commit_every_n:
+ if self.commit_timer is not None:
+ self.commit_timer.stop()
+ self.commit()
+ self.commit_timer.start()
+ else:
+ self.commit()
+
def __iter__(self):
"""
- Create an iterate per partition. Iterate through them calling next() until they are
- all exhausted.
+ Create an iterate per partition. Iterate through them calling next()
+ until they are all exhausted.
"""
iters = {}
for partition, offset in self.offsets.items():
@@ -199,31 +225,30 @@ class SimpleConsumer(object):
except StopIteration:
log.debug("Done iterating over partition %s" % partition)
del iters[partition]
- continue # skip auto-commit since we didn't yield anything
- # auto commit logic
+ # skip auto-commit since we didn't yield anything
+ continue
+
+ # Count, check and commit messages if necessary
self.count_since_commit += 1
- if self.auto_commit is True:
- if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
- self.commit_timer.start()
- else:
- self.commit()
+ self._auto_commit()
def __iter_partition__(self, partition, offset):
"""
- Iterate over the messages in a partition. Create a FetchRequest to get back
- a batch of messages, yield them one at a time. After a batch is exhausted,
- start a new batch unless we've reached the end of ths partition.
+ Iterate over the messages in a partition. Create a FetchRequest
+ to get back a batch of messages, yield them one at a time.
+ After a batch is exhausted, start a new batch unless we've reached
+ the end of this partition.
"""
while True:
- req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
+ # TODO: configure fetch size
+ req = FetchRequest(self.topic, partition, offset, 1024)
(resp,) = self.client.send_fetch_request([req])
+
assert resp.topic == self.topic
assert resp.partition == partition
+
next_offset = None
for message in resp.messages:
next_offset = message.offset