summaryrefslogtreecommitdiff
path: root/test/test_producer_legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_legacy.py')
-rw-r--r--test/test_producer_legacy.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
new file mode 100644
index 0000000..850cb80
--- /dev/null
+++ b/test/test_producer_legacy.py
@@ -0,0 +1,257 @@
+# -*- coding: utf-8 -*-
+
+import collections
+import logging
+import threading
+import time
+
+from mock import MagicMock, patch
+from . import unittest
+
+from kafka import SimpleClient, SimpleProducer, KeyedProducer
+from kafka.common import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
+ ProduceResponsePayload, RetryOptions, TopicPartition
+)
+from kafka.producer.base import Producer, _send_upstream
+from kafka.protocol import CODEC_NONE
+
+from six.moves import queue, xrange
+
+
+class TestKafkaProducer(unittest.TestCase):
+ def test_producer_message_types(self):
+
+ producer = Producer(MagicMock())
+ topic = b"test-topic"
+ partition = 0
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'}, None,)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, partition, m)
+
+ good_data_types = (b'a string!',)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, partition, m)
+
+ def test_keyedproducer_message_types(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+ producer = KeyedProducer(client)
+ topic = b"test-topic"
+ key = b"testkey"
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'},)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, key, m)
+
+ good_data_types = (b'a string!', None,)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, key, m)
+
+ def test_topic_message_types(self):
+ client = MagicMock()
+
+ def partitions(topic):
+ return [0, 1]
+
+ client.get_partition_ids_for_topic = partitions
+
+ producer = SimpleProducer(client, random_start=False)
+ topic = b"test-topic"
+ producer.send_messages(topic, b'hi')
+ assert client.send_produce_request.called
+
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled(self, mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), async=True,
+ async_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+ message = b'test-message'
+
+ with self.assertRaises(AsyncProducerQueueFull):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
+
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(SimpleClient, 'load_metadata_for_topics'):
+ with patch.object(SimpleClient, 'ensure_topic_exists'):
+ with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = SimpleClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
+ def test_cleanup_is_not_called_on_stopped_producer(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = True
+ with patch.object(producer, 'stop') as mocked_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(mocked_stop.call_count, 0)
+
+ def test_cleanup_is_called_on_running_producer(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = False
+ with patch.object(producer, 'stop') as mocked_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(mocked_stop.call_count, 1)
+
+
+class TestKafkaProducerSendUpstream(unittest.TestCase):
+
+ def setUp(self):
+ self.client = MagicMock()
+ self.queue = queue.Queue()
+
+ def _run_process(self, retries_limit=3, sleep_timeout=1):
+ # run _send_upstream process with the queue
+ stop_event = threading.Event()
+ retry_options = RetryOptions(limit=retries_limit,
+ backoff_ms=50,
+ retry_on_timeouts=False)
+ self.thread = threading.Thread(
+ target=_send_upstream,
+ args=(self.queue, self.client, CODEC_NONE,
+ 0.3, # batch time (seconds)
+ 3, # batch length
+ Producer.ACK_AFTER_LOCAL_WRITE,
+ Producer.DEFAULT_ACK_TIMEOUT,
+ retry_options,
+ stop_event))
+ self.thread.daemon = True
+ self.thread.start()
+ time.sleep(sleep_timeout)
+ stop_event.set()
+
+ def test_wo_retries(self):
+
+ # lets create a queue and add 10 messages for 1 partition
+ for i in range(10):
+ self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
+
+ self._run_process()
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 4 non-void cals:
+ # 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.client.send_produce_request.call_count, 4)
+
+ def test_first_send_failed(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [FailedPayloadsError(req) for req in reqs]
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # plus 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
+ def test_with_limited_retries(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
+
+ def send_side_effect(reqs, *args, **kwargs):
+ return [FailedPayloadsError(req) for req in reqs]
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(3, 3)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 16 non-void calls:
+ # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
+ # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
+ self.assertEqual(self.client.send_produce_request.call_count, 16)
+
+ def test_async_producer_not_leader(self):
+
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [ProduceResponsePayload(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
+ for req in reqs]
+
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
+ def tearDown(self):
+ for _ in xrange(self.queue.qsize()):
+ self.queue.get()