diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 5 | ||||
-rw-r--r-- | kafka/partitioner.py | 56 | ||||
-rw-r--r-- | kafka/producer.py | 34 |
3 files changed, 95 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py index 5595d49..1146798 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -73,7 +73,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + if not partitions: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) diff --git a/kafka/partitioner.py b/kafka/partitioner.py new file mode 100644 index 0000000..84db4d5 --- /dev/null +++ b/kafka/partitioner.py @@ -0,0 +1,56 @@ +from itertools import cycle + + +class Partitioner(object): + """ + Base class for a partitioner + """ + def __init__(self, partitions): + """ + Initialize the partitioner + + partitions - A list of available partitions (during startup) + """ + self.partitions = partitions + + def partition(self, key, partitions): + """ + Takes a string key and num_partitions as argument and returns + a partition to be used for the message + + partitions - The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing + """ + raise NotImplemented('partition function has to be implemented') + + +class RoundRobinPartitioner(Partitioner): + """ + Implements a round robin partitioner which sends data to partitions + in a round robin fashion + """ + def __init__(self, partitions): + self._set_partitions(partitions) + + def _set_partitions(self, partitions): + self.partitions = partitions + self.iterpart = cycle(partitions) + + def partition(self, key, partitions): + # Refresh the partition list if necessary + if self.partitions != partitions: + self._set_partitions(partitions) + + return self.iterpart.next() + + +class HashedPartitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def partition(self, key, partitions): + size = len(partitions) + idx = hash(key) % size + return partitions[idx] diff --git a/kafka/producer.py b/kafka/producer.py index 9a8ba7a..413628b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,6 +4,7 @@ import logging from kafka.common import ProduceRequest from kafka.protocol import create_message +from kafka.partitioner import HashedPartitioner log = logging.getLogger("kafka") @@ -72,3 +73,36 @@ class SimpleProducer(Producer): req = ProduceRequest(self.topic, self.next_partition.next(), messages=[create_message(m) for m in msg]) self.send_request(req) + + +class KeyedProducer(Producer): + """ + A producer which distributes messages to partitions based on the key + + Args: + client - The kafka client instance + topic - The kafka topic to send messages to + partitioner - A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async - If True, the messages are sent asynchronously via another + thread (process). We will not wait for a response to these + """ + def __init__(self, client, topic, partitioner=None, async=False): + self.topic = topic + client._load_metadata_for_topics(topic) + + if not partitioner: + partitioner = HashedPartitioner + + self.partitioner = partitioner(client.topic_partitions[topic]) + + super(KeyedProducer, self).__init__(client, async) + + def send(self, key, msg): + partitions = self.client.topic_partitions[self.topic] + partition = self.partitioner.partition(key, partitions) + + req = ProduceRequest(self.topic, partition, + messages=[create_message(msg)]) + + self.send_request(req) |