diff options
-rw-r--r-- | kafka/consumer.py | 20 | ||||
-rw-r--r-- | test/integration.py | 21 |
2 files changed, 21 insertions, 20 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index f41488f..603ea36 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -155,26 +155,6 @@ class SimpleConsumer(object): assert resp.error == 0 self.count_since_commit = 0 - def get_messages(self, count=1): - """ - Get the specified number of messages - - count: maximum number of messages to be fetched - """ - if not hasattr(self, '_iterator'): - self._iterator = iter(self) - - msgs = [] - while count > 0: - try: - msgs.append(self._iterator.next()) - count -= 1 - except StopIteration: - delattr(self, '_iterator') - break - - return msgs - def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() until they are diff --git a/test/integration.py b/test/integration.py index 609cfc6..68e0e25 100644 --- a/test/integration.py +++ b/test/integration.py @@ -456,6 +456,27 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 13) + def test_pending(self): + # Produce 10 messages to partition 0 and 1 + + produce1 = ProduceRequest("test_pending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest("test_pending", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", "test_pending") + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) |