summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-05-27 14:54:03 +0530
committerMahendra M <mahendra.m@gmail.com>2013-05-27 14:54:03 +0530
commit38215b66ebc70954c8448ce0397ac007ba35d697 (patch)
treefd60d36c6a26f45242d3414fa33d3806822caaaa
parent6327ba38bcc799b948a8f723ea2e6f078f8e90a8 (diff)
downloadkafka-python-38215b66ebc70954c8448ce0397ac007ba35d697.tar.gz
New API for checking pending message count
-rw-r--r--kafka/consumer.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..603ea36 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -101,6 +101,30 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def pending(self, partitions=[]):
+ """
+ Gets the pending message count
+
+ partitions: list of partitions to check for, default is to check all
+ """
+ if len(partitions) == 0:
+ partitions = self.offsets.keys()
+
+ total = 0
+ reqs = []
+
+ for partition in partitions:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ partition = resp.partition
+ pending = resp.offsets[0]
+ offset = self.offsets[partition]
+ total += pending - offset - (1 if offset > 0 else 0)
+
+ return total
+
def commit(self, partitions=[]):
"""
Commit offsets for this consumer