diff options
Diffstat (limited to 'test/test_producer_legacy.py')
-rw-r--r-- | test/test_producer_legacy.py | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py new file mode 100644 index 0000000..850cb80 --- /dev/null +++ b/test/test_producer_legacy.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- + +import collections +import logging +import threading +import time + +from mock import MagicMock, patch +from . import unittest + +from kafka import SimpleClient, SimpleProducer, KeyedProducer +from kafka.common import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, + ProduceResponsePayload, RetryOptions, TopicPartition +) +from kafka.producer.base import Producer, _send_upstream +from kafka.protocol import CODEC_NONE + +from six.moves import queue, xrange + + +class TestKafkaProducer(unittest.TestCase): + def test_producer_message_types(self): + + producer = Producer(MagicMock()) + topic = b"test-topic" + partition = 0 + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'}, None,) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, partition, m) + + good_data_types = (b'a string!',) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, partition, m) + + def test_keyedproducer_message_types(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + producer = KeyedProducer(client) + topic = b"test-topic" + key = b"testkey" + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'},) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, key, m) + + good_data_types = (b'a string!', None,) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, key, m) + + def test_topic_message_types(self): + client = MagicMock() + + def partitions(topic): + return [0, 1] + + client.get_partition_ids_for_topic = partitions + + producer = SimpleProducer(client, random_start=False) + topic = b"test-topic" + producer.send_messages(topic, b'hi') + assert client.send_produce_request.called + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(AsyncProducerQueueFull): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + def test_producer_sync_fail_on_error(self): + error = FailedPayloadsError('failure') + with patch.object(SimpleClient, 'load_metadata_for_topics'): + with patch.object(SimpleClient, 'ensure_topic_exists'): + with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): + + client = SimpleClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + + def test_cleanup_is_not_called_on_stopped_producer(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = True + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 0) + + def test_cleanup_is_called_on_running_producer(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = False + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 1) + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + self.client = MagicMock() + self.queue = queue.Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) + self.thread = threading.Thread( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + retry_options, + stop_event)) + self.thread.daemon = True + self.thread.start() + time.sleep(sleep_timeout) + stop_event.set() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [FailedPayloadsError(req) for req in reqs] + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponsePayload(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # plus 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i)) + + def send_side_effect(reqs, *args, **kwargs): + return [FailedPayloadsError(req) for req in reqs] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 3) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void calls: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 + self.assertEqual(self.client.send_produce_request.call_count, 16) + + def test_async_producer_not_leader(self): + + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [ProduceResponsePayload(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) + for req in reqs] + + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponsePayload(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() |