summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-09-07 12:17:54 -0700
committerDana Powers <dana.powers@gmail.com>2014-09-07 12:17:54 -0700
commita99384f4c601d127ab1c4fe5b272ea5c07fd695d (patch)
treed559e3c3f650dab1ce9247aa7a89f41bdd410e46 /test/test_producer_integration.py
parent9856cc36d7742922133af0aa53767c8ed4731957 (diff)
parent1b282d21522d101f4129d5fc3e70e2b904d3b171 (diff)
downloadkafka-python-a99384f4c601d127ab1c4fe5b272ea5c07fd695d.tar.gz
Merge pull request #221 from dpkp/minor_cleanups
Minor cleanups
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 7d3a180..19d3a6d 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -2,11 +2,18 @@ import os
import time
import uuid
-from kafka import * # noqa
-from kafka.common import * # noqa
-from kafka.codec import has_gzip, has_snappy
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+from kafka import (
+ SimpleProducer, KeyedProducer,
+ create_message, create_gzip_message, create_snappy_message,
+ RoundRobinPartitioner, HashedPartitioner
+)
+from kafka.common import (
+ FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
+)
+from kafka.codec import has_snappy
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'
@@ -149,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
- resp = producer.send_messages(new_topic, self.msg("one"))
+ producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
def test_producer_random_order(self):
@@ -219,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -231,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -244,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(
self.client,
@@ -360,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -373,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)