summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer.py16
-rw-r--r--test/test_producer_integration.py30
2 files changed, 39 insertions, 7 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 12a2934..8f35963 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import logging
import time
+import random
from Queue import Empty
from collections import defaultdict
@@ -180,14 +181,20 @@ class SimpleProducer(Producer):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
+ random_start - If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ random_start=False):
self.partition_cycles = {}
+ self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
@@ -198,6 +205,13 @@ class SimpleProducer(Producer):
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+
+ # Randomize the initial partition that is returned
+ if self.random_start:
+ num_partitions = len(self.client.topic_partitions[topic])
+ for _ in xrange(random.randint(0, num_partitions-1)):
+ self.partition_cycles[topic].next()
+
return self.partition_cycles[topic].next()
def send_messages(self, topic, *msg):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 9c9dbd3..c69e117 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -124,19 +124,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client)
- # Will go to partition 0
- msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
+ # Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
self.assert_produce_response(resp, start_offset0)
- # Will go to partition 1
+ # Goes to the next partition, randomly.
resp = producer.send_messages(self.topic, self.msg("three"))
self.assert_produce_response(resp, start_offset1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
- # Will go to partition 0
+ # Goes back to the first partition because there's only two partitions
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
self.assert_produce_response(resp, start_offset0+2)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
@@ -144,9 +143,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
- def test_round_robin_partitioner(self):
- msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
+ def test_producer_random_order(self):
+ producer = SimpleProducer(self.client, random_start = True)
+ resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ resp2 = producer.send_messages(self.topic, self.msg("three"))
+ resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+
+ self.assertEqual(resp1[0].partition, resp3[0].partition)
+ self.assertNotEqual(resp1[0].partition, resp2[0].partition)
+
+ @kafka_versions("all")
+ def test_producer_ordered_start(self):
+ producer = SimpleProducer(self.client, random_start = False)
+ resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ resp2 = producer.send_messages(self.topic, self.msg("three"))
+ resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+ self.assertEqual(resp1[0].partition, 0)
+ self.assertEqual(resp2[0].partition, 1)
+ self.assertEqual(resp3[0].partition, 0)
+
+ @kafka_versions("all")
+ def test_round_robin_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)