diff options
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index b28a424..4a04b38 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,11 +4,17 @@ import logging import time import random -from Queue import Empty +try: + from queue import Empty +except ImportError: + from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process +import six +from six.moves import xrange + from kafka.common import ( ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError ) @@ -173,7 +179,7 @@ class Producer(object): raise TypeError("msg is not a list or tuple!") # Raise TypeError if any message is not encoded as bytes - if any(not isinstance(m, bytes) for m in msg): + if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") if self.async: @@ -221,7 +227,7 @@ class SimpleProducer(Producer): batch_send_every_t - If set, messages are send after this timeout random_start - If true, randomize the initial partition which the the first message block will be published to, otherwise - if false, the first message block will always publish + if false, the first message block will always publish to partition 0 before cycling through each partition """ def __init__(self, client, async=False, @@ -252,9 +258,9 @@ class SimpleProducer(Producer): if self.random_start: num_partitions = len(self.client.topic_partitions[topic]) for _ in xrange(random.randint(0, num_partitions-1)): - self.partition_cycles[topic].next() + next(self.partition_cycles[topic]) - return self.partition_cycles[topic].next() + return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): partition = self._next_partition(topic) |