diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 93 |
1 files changed, 59 insertions, 34 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index d09803a..3b64571 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -16,6 +16,7 @@ log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 + class SimpleConsumer(object): """ A simple consumer implementation that consumes all partitions for a topic @@ -25,13 +26,16 @@ class SimpleConsumer(object): topic: the topic to consume auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another - when one is triggered. These triggers simply call the commit method on this class. A - manual call to commit will also reset these triggers + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers """ def __init__(self, client, group, topic, auto_commit=True, @@ -63,17 +67,19 @@ class SimpleConsumer(object): elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: return 0 else: - raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) + raise Exception("OffsetFetchRequest for topic=%s, " + "partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) # Uncomment for 0.8.1 # #for partition in self.client.topic_partitions[topic]: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, fail_on_error=False) + # callback=get_or_init_offset_callback, + # fail_on_error=False) # self.offsets[partition] = offset - + for partition in self.client.topic_partitions[topic]: self.offsets[partition] = 0 @@ -87,14 +93,16 @@ class SimpleConsumer(object): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ - if whence == 1: # relative to current position + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offsets[partition] = _offset + offset - elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, distribute the remained evenly + elif whence in (0, 2): # relative to beginning or end + # divide the request offset by number of partitions, + # distribute the remained evenly (delta, rem) = divmod(offset, len(self.offsets)) deltas = {} - for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0): + for partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), fillvalue=0): deltas[partition] = delta + r reqs = [] @@ -108,7 +116,8 @@ class SimpleConsumer(object): resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition] + self.offsets[resp.partition] = resp.offsets[0] + \ + deltas[resp.partition] else: raise ValueError("Unexpected value for `whence`, %d" % whence) @@ -149,24 +158,24 @@ class SimpleConsumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit all of them + partitions: list of partitions to commit, default is to commit + all of them """ - # short circuit if nothing happened if self.count_since_commit == 0: return with self.commit_lock: reqs = [] - if len(partitions) == 0: # commit all partitions + if len(partitions) == 0: # commit all partitions partitions = self.offsets.keys() for partition in partitions: offset = self.offsets[partition] log.debug("Commit offset %d in SimpleConsumer: " "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + (offset, self.group, self.topic, partition)) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) @@ -177,10 +186,27 @@ class SimpleConsumer(object): self.count_since_commit = 0 + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit > self.auto_commit_every_n: + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + self.commit_timer.start() + else: + self.commit() + def __iter__(self): """ - Create an iterate per partition. Iterate through them calling next() until they are - all exhausted. + Create an iterate per partition. Iterate through them calling next() + until they are all exhausted. """ iters = {} for partition, offset in self.offsets.items(): @@ -199,31 +225,30 @@ class SimpleConsumer(object): except StopIteration: log.debug("Done iterating over partition %s" % partition) del iters[partition] - continue # skip auto-commit since we didn't yield anything - # auto commit logic + # skip auto-commit since we didn't yield anything + continue + + # Count, check and commit messages if necessary self.count_since_commit += 1 - if self.auto_commit is True: - if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self._auto_commit() def __iter_partition__(self, partition, offset): """ - Iterate over the messages in a partition. Create a FetchRequest to get back - a batch of messages, yield them one at a time. After a batch is exhausted, - start a new batch unless we've reached the end of ths partition. + Iterate over the messages in a partition. Create a FetchRequest + to get back a batch of messages, yield them one at a time. + After a batch is exhausted, start a new batch unless we've reached + the end of this partition. """ while True: - req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size + # TODO: configure fetch size + req = FetchRequest(self.topic, partition, offset, 1024) (resp,) = self.client.send_fetch_request([req]) + assert resp.topic == self.topic assert resp.partition == partition + next_offset = None for message in resp.messages: next_offset = message.offset |