summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py27
1 files changed, 25 insertions, 2 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 6723ff7..41e9c53 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -1,6 +1,7 @@
-import uuid
+import os
import time
import unittest
+import uuid
from kafka import * # noqa
from kafka.common import * # noqa
@@ -13,14 +14,21 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
@classmethod
def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.server.close()
cls.zk.close()
+ @kafka_versions("all")
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -36,6 +44,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
100,
)
+ @kafka_versions("all")
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -45,6 +54,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
10000,
)
+ @kafka_versions("all")
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -57,8 +67,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @unittest.skip("All snappy integration tests fail with nosnappyjava")
+ @kafka_versions("all")
def test_produce_many_snappy(self):
+ self.skipTest("All snappy integration tests fail with nosnappyjava")
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
@@ -69,6 +80,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
+ @kafka_versions("all")
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -85,6 +97,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request(messages, start_offset, msg_count)
+ @kafka_versions("all")
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
@@ -106,6 +119,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
+ @kafka_versions("all")
def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -130,6 +144,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_round_robin_partitioner(self):
msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
@@ -152,6 +167,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_hashed_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -174,6 +190,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -185,6 +202,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
+ @kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -197,6 +215,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -211,6 +230,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -259,6 +279,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -310,6 +331,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
@@ -322,6 +344,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
+ @kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)