diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 3 | ||||
-rw-r--r-- | kafka/consumer/base.py | 6 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 6 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 4 |
5 files changed, 17 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index c36cd08..4cd9e24 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -258,12 +258,14 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): + topic = kafka_bytestring(topic) return ( topic in self.topic_partitions and len(self.topic_partitions[topic]) > 0 ) def get_partition_ids_for_topic(self, topic): + topic = kafka_bytestring(topic) if topic not in self.topic_partitions: return [] @@ -312,6 +314,7 @@ class KafkaClient(object): Partition-level errors will also not be raised here (a single partition w/o a leader, for example) """ + topics = [kafka_bytestring(t) for t in topics] resp = self.send_metadata_request(topics) log.debug("Broker metadata: %s", resp.brokers) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0bbf46c..2bd42eb 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -10,7 +10,7 @@ from kafka.common import ( UnknownTopicOrPartitionError, check_error ) -from kafka.util import ReentrantTimer +from kafka.util import kafka_bytestring, ReentrantTimer log = logging.getLogger("kafka") @@ -44,8 +44,8 @@ class Consumer(object): auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client - self.topic = topic - self.group = group + self.topic = kafka_bytestring(topic) + self.group = None if group is None else kafka_bytestring(group) self.client.load_metadata_for_topics(topic) self.offsets = {} diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 3acd470..cfe0ef6 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -163,7 +163,7 @@ class MultiProcessConsumer(Consumer): simple_consumer_options.pop('partitions', None) options.update(simple_consumer_options) - args = (client.copy(), group, topic, self.queue, + args = (client.copy(), self.group, self.topic, self.queue, self.size, self.events) proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6a5a94e..00c4d46 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -17,6 +17,7 @@ from kafka.common import ( ProduceRequest, TopicAndPartition, UnsupportedCodecError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set +from kafka.util import kafka_bytestring log = logging.getLogger("kafka") @@ -170,6 +171,7 @@ class Producer(object): All messages produced via this method will set the message 'key' to Null """ + topic = kafka_bytestring(topic) return self._send_messages(topic, partition, *msg) def _send_messages(self, topic, partition, *msg, **kwargs): @@ -183,6 +185,10 @@ class Producer(object): if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") + # Raise TypeError if topic is not encoded as bytes + if not isinstance(topic, six.binary_type): + raise TypeError("the topic must be type bytes") + # Raise TypeError if the key is not encoded as bytes if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 36328ed..333b6c0 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -3,6 +3,8 @@ from __future__ import absolute_import import logging from kafka.partitioner import HashedPartitioner +from kafka.util import kafka_bytestring + from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, BATCH_SEND_MSG_COUNT @@ -57,10 +59,12 @@ class KeyedProducer(Producer): return partitioner.partition(key) def send_messages(self,topic,key,*msg): + topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) return self._send_messages(topic, partition, *msg,key=key) def send(self, topic, key, msg): + topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) return self._send_messages(topic, partition, msg, key=key) |