diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-05-22 12:06:38 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-05-22 12:06:38 -0700 |
commit | 35a14e18c631508e195f9377a6b5a4861966b3a2 (patch) | |
tree | bdff5da1110b9b97150571e46280dbe76307b49d /test | |
parent | ae6b49aca13d2d1df7e7f884b2a99c34aa839e18 (diff) | |
download | kafka-python-35a14e18c631508e195f9377a6b5a4861966b3a2.tar.gz |
Handle New Topic Creation
Adds ensure_topic_exists to KafkaClient, redirects test case to use
that. Fixes #113 and fixes #150.
Diffstat (limited to 'test')
-rw-r--r-- | test/test_producer_integration.py | 9 | ||||
-rw-r--r-- | test/testutil.py | 13 |
2 files changed, 10 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c69e117..0718cb3 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -143,6 +143,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() @kafka_versions("all") + def test_produce__new_topic_fails_with_reasonable_error(self): + new_topic = 'new_topic_{}'.format(str(uuid.uuid4())) + producer = SimpleProducer(self.client) + + # At first it doesn't exist + with self.assertRaises(UnknownTopicOrPartitionError): + resp = producer.send_messages(new_topic, self.msg("one")) + + @kafka_versions("all") def test_producer_random_order(self): producer = SimpleProducer(self.client, random_start = True) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) diff --git a/test/testutil.py b/test/testutil.py index 78e6f7d..4f5f6ee 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -13,7 +13,6 @@ from kafka import KafkaClient __all__ = [ 'random_string', - 'ensure_topic_creation', 'get_open_port', 'kafka_versions', 'KafkaIntegrationTestCase', @@ -39,16 +38,6 @@ def kafka_versions(*versions): return wrapper return kafka_versions -def ensure_topic_creation(client, topic_name, timeout = 30): - start_time = time.time() - - client.load_metadata_for_topics(topic_name) - while not client.has_metadata_for_topic(topic_name): - if time.time() > start_time + timeout: - raise Exception("Unable to create topic %s" % topic_name) - client.load_metadata_for_topics(topic_name) - time.sleep(1) - def get_open_port(): sock = socket.socket() sock.bind(("", 0)) @@ -71,7 +60,7 @@ class KafkaIntegrationTestCase(unittest2.TestCase): if self.create_client: self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) - ensure_topic_creation(self.client, self.topic) + self.client.ensure_topic_exists(self.topic) self._messages = {} |