summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py3
-rw-r--r--kafka/consumer/base.py6
-rw-r--r--kafka/consumer/multiprocess.py2
-rw-r--r--kafka/producer/base.py6
-rw-r--r--kafka/producer/keyed.py4
5 files changed, 17 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py
index c36cd08..4cd9e24 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -258,12 +258,14 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
+ topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
+ topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
return []
@@ -312,6 +314,7 @@ class KafkaClient(object):
Partition-level errors will also not be raised here
(a single partition w/o a leader, for example)
"""
+ topics = [kafka_bytestring(t) for t in topics]
resp = self.send_metadata_request(topics)
log.debug("Broker metadata: %s", resp.brokers)
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 0bbf46c..2bd42eb 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -10,7 +10,7 @@ from kafka.common import (
UnknownTopicOrPartitionError, check_error
)
-from kafka.util import ReentrantTimer
+from kafka.util import kafka_bytestring, ReentrantTimer
log = logging.getLogger("kafka")
@@ -44,8 +44,8 @@ class Consumer(object):
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
- self.topic = topic
- self.group = group
+ self.topic = kafka_bytestring(topic)
+ self.group = None if group is None else kafka_bytestring(group)
self.client.load_metadata_for_topics(topic)
self.offsets = {}
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 3acd470..cfe0ef6 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -163,7 +163,7 @@ class MultiProcessConsumer(Consumer):
simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options)
- args = (client.copy(), group, topic, self.queue,
+ args = (client.copy(), self.group, self.topic, self.queue,
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 6a5a94e..00c4d46 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -17,6 +17,7 @@ from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
+from kafka.util import kafka_bytestring
log = logging.getLogger("kafka")
@@ -170,6 +171,7 @@ class Producer(object):
All messages produced via this method will set the message 'key' to Null
"""
+ topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs):
@@ -183,6 +185,10 @@ class Producer(object):
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")
+ # Raise TypeError if topic is not encoded as bytes
+ if not isinstance(topic, six.binary_type):
+ raise TypeError("the topic must be type bytes")
+
# Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 36328ed..333b6c0 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import
import logging
from kafka.partitioner import HashedPartitioner
+from kafka.util import kafka_bytestring
+
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
@@ -57,10 +59,12 @@ class KeyedProducer(Producer):
return partitioner.partition(key)
def send_messages(self,topic,key,*msg):
+ topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key)
def send(self, topic, key, msg):
+ topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, msg, key=key)