diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-05-27 14:54:03 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-05-27 14:54:03 +0530 |
commit | 38215b66ebc70954c8448ce0397ac007ba35d697 (patch) | |
tree | fd60d36c6a26f45242d3414fa33d3806822caaaa | |
parent | 6327ba38bcc799b948a8f723ea2e6f078f8e90a8 (diff) | |
download | kafka-python-38215b66ebc70954c8448ce0397ac007ba35d697.tar.gz |
New API for checking pending message count
-rw-r--r-- | kafka/consumer.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index f123113..603ea36 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -101,6 +101,30 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + def pending(self, partitions=[]): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if len(partitions) == 0: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total + def commit(self, partitions=[]): """ Commit offsets for this consumer |