diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 6723ff7..41e9c53 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -1,6 +1,7 @@ -import uuid +import os import time import unittest +import uuid from kafka import * # noqa from kafka.common import * # noqa @@ -13,14 +14,21 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) @classmethod def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.server.close() cls.zk.close() + @kafka_versions("all") def test_produce_many_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -36,6 +44,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 100, ) + @kafka_versions("all") def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -45,6 +54,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 10000, ) + @kafka_versions("all") def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) @@ -57,8 +67,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @unittest.skip("All snappy integration tests fail with nosnappyjava") + @kafka_versions("all") def test_produce_many_snappy(self): + self.skipTest("All snappy integration tests fail with nosnappyjava") start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ @@ -69,6 +80,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) + @kafka_versions("all") def test_produce_mixed(self): start_offset = self.current_offset(self.topic, 0) @@ -85,6 +97,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request(messages, start_offset, msg_count) + @kafka_versions("all") def test_produce_100k_gzipped(self): start_offset = self.current_offset(self.topic, 0) @@ -106,6 +119,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # SimpleProducer Tests # ############################ + @kafka_versions("all") def test_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -130,6 +144,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ] @@ -152,6 +167,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_hashed_partitioner(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -174,6 +190,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -185,6 +202,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) producer.stop() + @kafka_versions("all") def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -197,6 +215,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_acks_cluster_commit(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -211,6 +230,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -259,6 +279,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -310,6 +331,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) @@ -322,6 +344,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() + @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) |