summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py20
-rw-r--r--test/integration.py21
2 files changed, 21 insertions, 20 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f41488f..603ea36 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -155,26 +155,6 @@ class SimpleConsumer(object):
assert resp.error == 0
self.count_since_commit = 0
- def get_messages(self, count=1):
- """
- Get the specified number of messages
-
- count: maximum number of messages to be fetched
- """
- if not hasattr(self, '_iterator'):
- self._iterator = iter(self)
-
- msgs = []
- while count > 0:
- try:
- msgs.append(self._iterator.next())
- count -= 1
- except StopIteration:
- delattr(self, '_iterator')
- break
-
- return msgs
-
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next() until they are
diff --git a/test/integration.py b/test/integration.py
index 609cfc6..68e0e25 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -456,6 +456,27 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13)
+ def test_pending(self):
+ # Produce 10 messages to partition 0 and 1
+
+ produce1 = ProduceRequest("test_pending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ produce2 = ProduceRequest("test_pending", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_pending")
+ self.assertEquals(consumer.pending(), 20)
+ self.assertEquals(consumer.pending(partitions=[0]), 10)
+ self.assertEquals(consumer.pending(partitions=[1]), 10)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)