diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-05 17:19:54 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:09 +0000 |
commit | 1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch) | |
tree | b7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 /test | |
parent | efc03d083d323e35a2d32bcbdbccc053f737836e (diff) | |
download | kafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz |
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
Diffstat (limited to 'test')
-rw-r--r-- | test/test_consumer_integration.py | 47 |
1 files changed, 46 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index eab93be..803b16a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -12,7 +12,8 @@ from kafka import ( ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( - ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError + ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, + KafkaTimeoutError ) from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp @@ -666,6 +667,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + offsets = consumer.offsets_for_times({}) + self.assertEqual(offsets, {}) + # Out of bound timestamps check offsets = consumer.offsets_for_times({tp: 0}) @@ -675,6 +679,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): offsets = consumer.offsets_for_times({tp: 9999999999999}) self.assertEqual(offsets[tp], None) + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + self.assertEqual(offsets, { + tp: early_msg.offset, + }) + offsets = consumer.end_offsets([tp]) + self.assertEqual(offsets, { + tp: late_msg.offset + 1 + }) + @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_search_many_partitions(self): tp0 = TopicPartition(self.topic, 0) @@ -700,6 +715,18 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): tp1: OffsetAndTimestamp(p1msg.offset, send_time) }) + offsets = consumer.beginning_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset, + tp1: p1msg.offset + }) + + offsets = consumer.end_offsets([tp0, tp1]) + self.assertEqual(offsets, { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() @@ -707,3 +734,21 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) + + with self.assertRaises(UnsupportedVersionError): + consumer.beginning_offsets([tp]) + + with self.assertRaises(UnsupportedVersionError): + consumer.end_offsets([tp]) + + @kafka_versions('<0.10.1') + def test_kafka_consumer_offsets_for_times_errors(self): + consumer = self.kafka_consumer() + tp = TopicPartition(self.topic, 0) + bad_tp = TopicPartition(self.topic, 100) + + with self.assertRaises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with self.assertRaises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) |