diff options
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | kafka/producer.py | 6 | ||||
-rw-r--r-- | test/test_failover_integration.py | 112 |
3 files changed, 76 insertions, 46 deletions
@@ -54,6 +54,8 @@ producer.send_messages("my-topic", "this method", "is variadic") producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) # To send messages asynchronously +# WARNING: current implementation does not guarantee message delivery on failure! +# messages can get dropped! Use at your own risk! Or help us improve with a PR! producer = SimpleProducer(kafka, async=True) producer.send_messages("my-topic", "async message") @@ -66,7 +68,7 @@ producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) -response = producer.send_messages("my-topic", "async message") +response = producer.send_messages("my-topic", "another message") if response: print(response[0].error) diff --git a/kafka/producer.py b/kafka/producer.py index 800e677..8a6bff0 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -87,6 +87,9 @@ class Producer(object): client - The Kafka client instance to use async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these + WARNING!!! current implementation of async producer does not + guarantee message delivery. Use at your own risk! Or help us + improve with a PR! req_acks - A value indicating the acknowledgements that the server must receive before responding to the request ack_timeout - Value (in milliseconds) indicating a timeout for waiting @@ -131,6 +134,9 @@ class Producer(object): self.codec = codec if self.async: + log.warning("async producer does not guarantee message delivery!") + log.warning("Current implementation does not retry Failed messages") + log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, args=(self.queue, diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 6298f62..20a7f28 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -1,11 +1,15 @@ +import logging import os import time +import unittest2 from kafka import * # noqa from kafka.common import * # noqa +from kafka.producer import Producer from fixtures import ZookeeperFixture, KafkaFixture from testutil import * + class TestFailover(KafkaIntegrationTestCase): create_client = False @@ -39,82 +43,100 @@ class TestFailover(KafkaIntegrationTestCase): @kafka_versions("all") def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client) - for i in range(1, 4): + # Test the base class Producer -- send_messages to a specific partition + producer = Producer(self.client, async=False) - # XXX unfortunately, the conns dict needs to be warmed for this to work - # XXX unfortunately, for warming to work, we need at least as many partitions as brokers - self._send_random_messages(producer, self.topic, 10) + # Send 10 random messages + self._send_random_messages(producer, topic, partition, 10) - # kil leader for partition 0 - broker = self._kill_leader(topic, partition) + # kill leader for partition + broker = self._kill_leader(topic, partition) - # expect failure, reload meta data - with self.assertRaises(FailedPayloadsError): - producer.send_messages(self.topic, 'part 1') - producer.send_messages(self.topic, 'part 2') - time.sleep(1) + # expect failure, but dont wait more than 60 secs to recover + recovered = False + started = time.time() + timeout = 60 + while not recovered and (time.time() - started) < timeout: + try: + logging.debug("attempting to send 'success' message after leader killed") + producer.send_messages(topic, partition, 'success') + logging.debug("success!") + recovered = True + except FailedPayloadsError, ConnectionError: + logging.debug("caught exception sending message -- will retry") + continue - # send to new leader - self._send_random_messages(producer, self.topic, 10) + # Verify we successfully sent the message + self.assertTrue(recovered) - broker.open() - time.sleep(3) + # send some more messages to new leader + self._send_random_messages(producer, topic, partition, 10) - # count number of messages - count = self._count_messages('test_switch_leader group %s' % i, topic) - self.assertIn(count, range(20 * i, 22 * i + 1)) + # count number of messages + count = self._count_messages('test_switch_leader group', topic, partition) - producer.stop() + # Should be equal to 10 before + 1 recovery + 10 after + self.assertEquals(count, 21) - @kafka_versions("all") + + #@kafka_versions("all") + @unittest2.skip("async producer does not support reliable failover yet") def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 - producer = SimpleProducer(self.client, async=True) - - for i in range(1, 4): - self._send_random_messages(producer, self.topic, 10) + # Test the base class Producer -- send_messages to a specific partition + producer = Producer(self.client, async=True) - # kil leader for partition 0 - broker = self._kill_leader(topic, partition) + # Send 10 random messages + self._send_random_messages(producer, topic, partition, 10) - # expect failure, reload meta data - producer.send_messages(self.topic, 'part 1') - producer.send_messages(self.topic, 'part 2') - time.sleep(1) + # kill leader for partition + broker = self._kill_leader(topic, partition) - # send to new leader - self._send_random_messages(producer, self.topic, 10) + logging.debug("attempting to send 'success' message after leader killed") - broker.open() - time.sleep(3) + # in async mode, this should return immediately + producer.send_messages(topic, partition, 'success') - # count number of messages - count = self._count_messages('test_switch_leader_async group %s' % i, topic) - self.assertIn(count, range(20 * i, 22 * i + 1)) + # send to new leader + self._send_random_messages(producer, topic, partition, 10) + # wait until producer queue is empty + while not producer.queue.empty(): + time.sleep(0.1) producer.stop() - def _send_random_messages(self, producer, topic, n): + # count number of messages + count = self._count_messages('test_switch_leader_async group', topic, partition) + + # Should be equal to 10 before + 1 recovery + 10 after + self.assertEquals(count, 21) + + + def _send_random_messages(self, producer, topic, partition, n): for j in range(n): - resp = producer.send_messages(topic, random_string(10)) + logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) + resp = producer.send_messages(topic, partition, random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time return broker - def _count_messages(self, group, topic): - hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + def _count_messages(self, group, topic, timeout=1): + hosts = ','.join(['%s:%d' % (broker.host, broker.port) + for broker in self.brokers]) + client = KafkaClient(hosts) - consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) + consumer = SimpleConsumer(client, group, topic, + auto_commit=False, + iter_timeout=timeout) + all_messages = [] for message in consumer: all_messages.append(message) |