diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_integration.py | 16 | ||||
-rw-r--r-- | test/test_codec.py | 8 | ||||
-rw-r--r-- | test/test_conn.py | 6 | ||||
-rw-r--r-- | test/test_consumer.py | 7 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 159 | ||||
-rw-r--r-- | test/test_failover_integration.py | 2 | ||||
-rw-r--r-- | test/test_package.py | 22 | ||||
-rw-r--r-- | test/test_producer_integration.py | 20 |
8 files changed, 177 insertions, 63 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 0cd2c9e..cc60778 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -32,12 +32,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp, = self.client.send_fetch_request([fetch]) - self.assertEquals(fetch_resp.error, 0) - self.assertEquals(fetch_resp.topic, self.topic) - self.assertEquals(fetch_resp.partition, 0) + self.assertEqual(fetch_resp.error, 0) + self.assertEqual(fetch_resp.topic, self.topic) + self.assertEqual(fetch_resp.partition, 0) messages = list(fetch_resp.messages) - self.assertEquals(len(messages), 0) + self.assertEqual(len(messages), 0) @kafka_versions("all") def test_ensure_topic_exists(self): @@ -58,10 +58,10 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): def test_commit_fetch_offsets(self): req = OffsetCommitRequest(self.topic, 0, 42, b"metadata") (resp,) = self.client.send_offset_commit_request(b"group", [req]) - self.assertEquals(resp.error, 0) + self.assertEqual(resp.error, 0) req = OffsetFetchRequest(self.topic, 0) (resp,) = self.client.send_offset_fetch_request(b"group", [req]) - self.assertEquals(resp.error, 0) - self.assertEquals(resp.offset, 42) - self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now + self.assertEqual(resp.error, 0) + self.assertEqual(resp.offset, 42) + self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now diff --git a/test/test_codec.py b/test/test_codec.py index 0ea1074..2d7670a 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -15,14 +15,14 @@ class TestCodec(unittest.TestCase): for i in xrange(1000): s1 = random_string(100) s2 = gzip_decode(gzip_encode(s1)) - self.assertEquals(s1, s2) + self.assertEqual(s1, s2) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): for i in xrange(1000): s1 = random_string(100) s2 = snappy_decode(snappy_encode(s1)) - self.assertEquals(s1, s2) + self.assertEqual(s1, s2) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_detect_xerial(self): @@ -53,7 +53,7 @@ class TestCodec(unittest.TestCase): + struct.pack('!i', block_len) + random_snappy \ + struct.pack('!i', block_len2) + random_snappy2 \ - self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50)) + self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50)) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy_encode_xerial(self): @@ -68,5 +68,5 @@ class TestCodec(unittest.TestCase): to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) - self.assertEquals(compressed, to_ensure) + self.assertEqual(compressed, to_ensure) diff --git a/test/test_conn.py b/test/test_conn.py index 7b3beb7..2c8f3b2 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -120,7 +120,7 @@ class ConnTest(unittest.TestCase): def test_recv(self): - self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) + self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) def test_recv__reconnects_on_dirty_conn(self): @@ -151,8 +151,8 @@ class ConnTest(unittest.TestCase): def test_recv__doesnt_consume_extra_data_in_stream(self): # Here just test that each call to recv will return a single payload - self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) - self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2']) + self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) + self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) def test_close__object_is_reusable(self): diff --git a/test/test_consumer.py b/test/test_consumer.py index 9060919..7b8f370 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -2,9 +2,14 @@ from mock import MagicMock from . import unittest -from kafka.consumer import SimpleConsumer +from kafka import SimpleConsumer, KafkaConsumer +from kafka.common import KafkaConfigurationError class TestKafkaConsumer(unittest.TestCase): def test_non_integer_partitions(self): with self.assertRaises(AssertionError): SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + + def test_broker_list_required(self): + with self.assertRaises(KafkaConfigurationError): + KafkaConsumer() diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2762008..ea32318 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,9 +1,12 @@ +import logging import os from six.moves import xrange -from kafka import SimpleConsumer, MultiProcessConsumer, create_message -from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall +from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message +from kafka.common import ( + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout +) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from test.fixtures import ZookeeperFixture, KafkaFixture @@ -36,16 +39,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ create_message(self.msg(str(msg))) for msg in messages ] produce = ProduceRequest(self.topic, partition, messages = messages) resp, = self.client.send_produce_request([produce]) - self.assertEquals(resp.error, 0) + self.assertEqual(resp.error, 0) return [ x.value for x in messages ] def assert_message_count(self, messages, num_messages): # Make sure we got them all - self.assertEquals(len(messages), num_messages) + self.assertEqual(len(messages), num_messages) # Make sure there are no duplicates - self.assertEquals(len(set(messages)), num_messages) + self.assertEqual(len(set(messages)), num_messages) + + def consumer(self, **kwargs): + if os.environ['KAFKA_VERSION'] == "0.8.0": + # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['auto_commit'] = False + else: + kwargs.setdefault('auto_commit', True) + + consumer_class = kwargs.pop('consumer', SimpleConsumer) + group = kwargs.pop('group', self.id().encode('utf-8')) + topic = kwargs.pop('topic', self.topic) + + if consumer_class == SimpleConsumer: + kwargs.setdefault('iter_timeout', 0) + + return consumer_class(self.client, group, topic, **kwargs) + + def kafka_consumer(self, **configs): + brokers = '%s:%d' % (self.server.host, self.server.port) + consumer = KafkaConsumer(self.topic, + metadata_broker_list=brokers, + **configs) + return consumer @kafka_versions("all") def test_simple_consumer(self): @@ -114,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - self.assertEquals(consumer.pending(), 20) - self.assertEquals(consumer.pending(partitions=[0]), 10) - self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer = self.consumer() + + self.assertEqual(consumer.pending(), 20) + self.assertEqual(consumer.pending(partitions=[0]), 10) + self.assertEqual(consumer.pending(partitions=[1]), 10) # move to last message, so one partition should have 1 pending # message and other 0 @@ -175,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) - self.assertEquals(consumer.pending(), 20) - self.assertEquals(consumer.pending(partitions=[0]), 10) - self.assertEquals(consumer.pending(partitions=[1]), 10) + self.assertEqual(consumer.pending(), 20) + self.assertEqual(consumer.pending(partitions=[0]), 10) + self.assertEqual(consumer.pending(partitions=[1]), 10) consumer.stop() @@ -225,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Consume giant message successfully message = big_consumer.get_message(block=False, timeout=10) self.assertIsNotNone(message) - self.assertEquals(message.message.value, huge_message) + self.assertEqual(message.message.value, huge_message) big_consumer.stop() @@ -273,20 +301,101 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.consumer(buffer_size=1024, max_buffer_size=2048) messages = [ message for message in consumer ] - self.assertEquals(len(messages), 2) + self.assertEqual(len(messages), 2) - def consumer(self, **kwargs): - if os.environ['KAFKA_VERSION'] == "0.8.0": - # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off - kwargs['auto_commit'] = False - else: - kwargs.setdefault('auto_commit', True) + @kafka_versions("all") + def test_kafka_consumer(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) - consumer_class = kwargs.pop('consumer', SimpleConsumer) - group = kwargs.pop('group', self.id().encode('utf-8')) - topic = kwargs.pop('topic', self.topic) + # Start a consumer + consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer_timeout_ms=5000) + n = 0 + messages = {0: set(), 1: set()} + logging.debug("kafka consumer offsets: %s" % consumer.offsets()) + for m in consumer: + logging.debug("Consumed message %s" % repr(m)) + n += 1 + messages[m.partition].add(m.offset) + if n >= 200: + break + + self.assertEqual(len(messages[0]), 100) + self.assertEqual(len(messages[1]), 100) - if consumer_class == SimpleConsumer: - kwargs.setdefault('iter_timeout', 0) + @kafka_versions("all") + def test_kafka_consumer__blocking(self): + TIMEOUT_MS = 500 + consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer_timeout_ms=TIMEOUT_MS) - return consumer_class(self.client, group, topic, **kwargs) + # Ask for 5 messages, nothing in queue, block 5 seconds + with Timer() as t: + with self.assertRaises(ConsumerTimeout): + msg = consumer.next() + self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) + + self.send_messages(0, range(0, 10)) + + # Ask for 5 messages, 10 in queue. Get 5 back, no blocking + messages = set() + with Timer() as t: + for i in range(5): + msg = consumer.next() + messages.add((msg.partition, msg.offset)) + self.assertEqual(len(messages), 5) + self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) + + # Ask for 10 messages, get 5 back, block 5 seconds + messages = set() + with Timer() as t: + with self.assertRaises(ConsumerTimeout): + for i in range(10): + msg = consumer.next() + messages.add((msg.partition, msg.offset)) + self.assertEqual(len(messages), 5) + self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) + + @kafka_versions("0.8.1", "0.8.1.1") + def test_kafka_consumer__offset_commit_resume(self): + GROUP_ID = random_string(10) + + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.kafka_consumer( + group_id = GROUP_ID, + auto_commit_enable = True, + auto_commit_interval_ms = None, + auto_commit_interval_messages = 20, + auto_offset_reset='smallest', + ) + + # Grab the first 195 messages + output_msgs1 = [] + for _ in xrange(195): + m = consumer1.next() + output_msgs1.append(m) + consumer1.task_done(m) + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.kafka_consumer( + group_id = GROUP_ID, + auto_commit_enable = True, + auto_commit_interval_ms = None, + auto_commit_interval_messages = 20, + consumer_timeout_ms = 100, + auto_offset_reset='smallest', + ) + + # 181-200 + output_msgs2 = [] + with self.assertRaises(ConsumerTimeout): + while True: + m = consumer2.next() + output_msgs2.append(m) + self.assert_message_count(output_msgs2, 20) + self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index d307d41..ca71f2d 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -121,7 +121,7 @@ class TestFailover(KafkaIntegrationTestCase): logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) resp = producer.send_messages(topic, partition, random_string(10)) if len(resp) > 0: - self.assertEquals(resp[0].error, 0) + self.assertEqual(resp[0].error, 0) logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) def _kill_leader(self, topic, partition): diff --git a/test/test_package.py b/test/test_package.py index 9b69a7c..e91753c 100644 --- a/test/test_package.py +++ b/test/test_package.py @@ -3,27 +3,27 @@ from . import unittest class TestPackage(unittest.TestCase): def test_top_level_namespace(self): import kafka as kafka1 - self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEquals(kafka1.client.__name__, "kafka.client") - self.assertEquals(kafka1.codec.__name__, "kafka.codec") + self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient") + self.assertEqual(kafka1.client.__name__, "kafka.client") + self.assertEqual(kafka1.codec.__name__, "kafka.codec") def test_submodule_namespace(self): import kafka.client as client1 - self.assertEquals(client1.__name__, "kafka.client") - self.assertEquals(client1.KafkaClient.__name__, "KafkaClient") + self.assertEqual(client1.__name__, "kafka.client") + self.assertEqual(client1.KafkaClient.__name__, "KafkaClient") from kafka import client as client2 - self.assertEquals(client2.__name__, "kafka.client") - self.assertEquals(client2.KafkaClient.__name__, "KafkaClient") + self.assertEqual(client2.__name__, "kafka.client") + self.assertEqual(client2.KafkaClient.__name__, "KafkaClient") from kafka.client import KafkaClient as KafkaClient1 - self.assertEquals(KafkaClient1.__name__, "KafkaClient") + self.assertEqual(KafkaClient1.__name__, "KafkaClient") from kafka.codec import gzip_encode as gzip_encode1 - self.assertEquals(gzip_encode1.__name__, "gzip_encode") + self.assertEqual(gzip_encode1.__name__, "gzip_encode") from kafka import KafkaClient as KafkaClient2 - self.assertEquals(KafkaClient2.__name__, "KafkaClient") + self.assertEqual(KafkaClient2.__name__, "KafkaClient") from kafka.codec import snappy_encode - self.assertEquals(snappy_encode.__name__, "snappy_encode") + self.assertEqual(snappy_encode.__name__, "snappy_encode") diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index d68af72..4331d23 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -251,7 +251,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) producer.stop() @@ -301,7 +301,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): ) # Batch mode is async. No ack - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) # It hasn't sent yet self.assert_fetch_offset(0, start_offset0, []) @@ -314,7 +314,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): ) # Batch mode is async. No ack - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), @@ -350,7 +350,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): ) # Batch mode is async. No ack - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) # It hasn't sent yet self.assert_fetch_offset(0, start_offset0, []) @@ -363,7 +363,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): ) # Batch mode is async. No ack - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) # Wait the timeout out time.sleep(5) @@ -389,7 +389,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = SimpleProducer(self.client, async=True) resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) @@ -402,7 +402,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) resp = producer.send(self.topic, self.key("key1"), self.msg("one")) - self.assertEquals(len(resp), 0) + self.assertEqual(len(resp), 0) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) @@ -429,9 +429,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ]) - self.assertEquals(resp.error, 0) - self.assertEquals(resp.partition, partition) + self.assertEqual(resp.error, 0) + self.assertEqual(resp.partition, partition) messages = [ x.message.value for x in resp.messages ] self.assertEqual(messages, expected_messages) - self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages)) + self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages)) |