diff options
-rw-r--r-- | kafka/producer/base.py | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 7f9b18c..6e19b92 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -172,6 +172,8 @@ class Producer(object): return self._send_messages(topic, partition, *msg) def _send_messages(self, topic, partition, *msg, **kwargs): + key = kwargs.pop('key', None) + # Guarantee that msg is actually a list or tuple (should always be true) if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") @@ -180,7 +182,10 @@ class Producer(object): if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") - key = kwargs.pop('key', None) + # Raise TypeError if the key is not encoded as bytes + if key is not None and not isinstance(key, six.binary_type): + raise TypeError("the key must be type bytes") + if self.async: for m in msg: self.queue.put((TopicAndPartition(topic, partition), m, key)) |