summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py16
1 files changed, 11 insertions, 5 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index b28a424..4a04b38 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -4,11 +4,17 @@ import logging
import time
import random
-from Queue import Empty
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
+import six
+from six.moves import xrange
+
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
@@ -173,7 +179,7 @@ class Producer(object):
raise TypeError("msg is not a list or tuple!")
# Raise TypeError if any message is not encoded as bytes
- if any(not isinstance(m, bytes) for m in msg):
+ if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")
if self.async:
@@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
- if false, the first message block will always publish
+ if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
@@ -252,9 +258,9 @@ class SimpleProducer(Producer):
if self.random_start:
num_partitions = len(self.client.topic_partitions[topic])
for _ in xrange(random.randint(0, num_partitions-1)):
- self.partition_cycles[topic].next()
+ next(self.partition_cycles[topic])
- return self.partition_cycles[topic].next()
+ return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg):
partition = self._next_partition(topic)