summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-05-22 12:06:38 -0700
committerMark Roberts <wizzat@gmail.com>2014-05-22 12:06:38 -0700
commit35a14e18c631508e195f9377a6b5a4861966b3a2 (patch)
treebdff5da1110b9b97150571e46280dbe76307b49d /kafka/producer.py
parentae6b49aca13d2d1df7e7f884b2a99c34aa839e18 (diff)
downloadkafka-python-35a14e18c631508e195f9377a6b5a4861966b3a2.tar.gz
Handle New Topic Creation
Adds ensure_topic_exists to KafkaClient, redirects test case to use that. Fixes #113 and fixes #150.
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 8e40be5..95c75c4 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -10,7 +10,7 @@ from itertools import cycle
from multiprocessing import Queue, Process
from kafka.common import (
- ProduceRequest, TopicAndPartition, UnsupportedCodecError
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
from kafka.partitioner import HashedPartitioner
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
@@ -216,7 +216,10 @@ class SimpleProducer(Producer):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ try:
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ except KeyError:
+ raise UnknownTopicOrPartitionError(topic)
# Randomize the initial partition that is returned
if self.random_start: