summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_producer_integration.py9
-rw-r--r--test/testutil.py13
2 files changed, 10 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index c69e117..0718cb3 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -143,6 +143,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
+ def test_produce__new_topic_fails_with_reasonable_error(self):
+ new_topic = 'new_topic_{}'.format(str(uuid.uuid4()))
+ producer = SimpleProducer(self.client)
+
+ # At first it doesn't exist
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ resp = producer.send_messages(new_topic, self.msg("one"))
+
+ @kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start = True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
diff --git a/test/testutil.py b/test/testutil.py
index 78e6f7d..4f5f6ee 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -13,7 +13,6 @@ from kafka import KafkaClient
__all__ = [
'random_string',
- 'ensure_topic_creation',
'get_open_port',
'kafka_versions',
'KafkaIntegrationTestCase',
@@ -39,16 +38,6 @@ def kafka_versions(*versions):
return wrapper
return kafka_versions
-def ensure_topic_creation(client, topic_name, timeout = 30):
- start_time = time.time()
-
- client.load_metadata_for_topics(topic_name)
- while not client.has_metadata_for_topic(topic_name):
- if time.time() > start_time + timeout:
- raise Exception("Unable to create topic %s" % topic_name)
- client.load_metadata_for_topics(topic_name)
- time.sleep(1)
-
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
@@ -71,7 +60,7 @@ class KafkaIntegrationTestCase(unittest2.TestCase):
if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
- ensure_topic_creation(self.client, self.topic)
+ self.client.ensure_topic_exists(self.topic)
self._messages = {}