diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-24 18:36:46 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-24 18:36:46 -0800 |
commit | 077dc4742ffa82584946379790424faf4c6ba47f (patch) | |
tree | bd14706a8dfc429f6bf211bac02ad21af967c6ce /test | |
parent | 48e96822b3ec4f897438a2d1cdb735f51648cb48 (diff) | |
parent | 85c0dd2579eb6aa0b9492d9082d0f4cf4d8ea39d (diff) | |
download | kafka-python-077dc4742ffa82584946379790424faf4c6ba47f.tar.gz |
Merge pull request #515 from dpkp/kafka_producer
KafkaProducer
Diffstat (limited to 'test')
-rw-r--r-- | test/conftest.py | 33 | ||||
-rw-r--r-- | test/fixtures.py | 3 | ||||
-rw-r--r-- | test/test_consumer_group.py | 30 | ||||
-rw-r--r-- | test/test_partitioner.py | 64 | ||||
-rw-r--r-- | test/test_producer.py | 291 | ||||
-rw-r--r-- | test/test_producer_legacy.py | 257 |
6 files changed, 369 insertions, 309 deletions
diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..f3a8947 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,33 @@ +import os + +import pytest + +from test.fixtures import KafkaFixture, ZookeeperFixture + + +@pytest.fixture(scope="module") +def version(): + if 'KAFKA_VERSION' not in os.environ: + return () + return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) + + +@pytest.fixture(scope="module") +def zookeeper(version, request): + assert version + zk = ZookeeperFixture.instance() + def fin(): + zk.close() + request.addfinalizer(fin) + return zk + + +@pytest.fixture(scope="module") +def kafka_broker(version, zookeeper, request): + assert version + k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port, + partitions=4) + def fin(): + k.close() + request.addfinalizer(fin) + return k diff --git a/test/fixtures.py b/test/fixtures.py index 91a67c1..2613a41 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -5,10 +5,11 @@ import shutil import subprocess import tempfile import time -from six.moves import urllib import uuid +from six.moves import urllib from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 + from test.service import ExternalService, SpawnedService from test.testutil import get_open_port diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 035d65a..f153d2d 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -12,38 +12,10 @@ from kafka.common import TopicPartition from kafka.conn import BrokerConnection, ConnectionStates from kafka.consumer.group import KafkaConsumer -from test.fixtures import KafkaFixture, ZookeeperFixture +from test.conftest import version from test.testutil import random_string -@pytest.fixture(scope="module") -def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) - - -@pytest.fixture(scope="module") -def zookeeper(version, request): - assert version - zk = ZookeeperFixture.instance() - def fin(): - zk.close() - request.addfinalizer(fin) - return zk - - -@pytest.fixture(scope="module") -def kafka_broker(version, zookeeper, request): - assert version - k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port, - partitions=4) - def fin(): - k.close() - request.addfinalizer(fin) - return k - - @pytest.fixture def simple_client(kafka_broker): connect_str = 'localhost:' + str(kafka_broker.port) diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 67cd83b..52b6b81 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -1,23 +1,43 @@ +import pytest import six -from . import unittest - -from kafka.partitioner import (Murmur2Partitioner) - -class TestMurmurPartitioner(unittest.TestCase): - def test_hash_bytes(self): - p = Murmur2Partitioner(range(1000)) - self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test')) - - def test_hash_encoding(self): - p = Murmur2Partitioner(range(1000)) - self.assertEqual(p.partition('test'), p.partition(u'test')) - - def test_murmur2_java_compatibility(self): - p = Murmur2Partitioner(range(1000)) - # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner - self.assertEqual(681, p.partition(b'')) - self.assertEqual(524, p.partition(b'a')) - self.assertEqual(434, p.partition(b'ab')) - self.assertEqual(107, p.partition(b'abc')) - self.assertEqual(566, p.partition(b'123456789')) - self.assertEqual(742, p.partition(b'\x00 ')) + +from kafka.partitioner import Murmur2Partitioner +from kafka.partitioner.default import DefaultPartitioner + + +def test_default_partitioner(): + partitioner = DefaultPartitioner() + all_partitions = list(range(100)) + available = all_partitions + # partitioner should return the same partition for the same key + p1 = partitioner(b'foo', all_partitions, available) + p2 = partitioner(b'foo', all_partitions, available) + assert p1 == p2 + assert p1 in all_partitions + + # when key is None, choose one of available partitions + assert partitioner(None, all_partitions, [123]) == 123 + + # with fallback to all_partitions + assert partitioner(None, all_partitions, []) in all_partitions + + +def test_hash_bytes(): + p = Murmur2Partitioner(range(1000)) + assert p.partition(bytearray(b'test')) == p.partition(b'test') + + +def test_hash_encoding(): + p = Murmur2Partitioner(range(1000)) + assert p.partition('test') == p.partition(u'test') + + +def test_murmur2_java_compatibility(): + p = Murmur2Partitioner(range(1000)) + # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner + assert p.partition(b'') == 681 + assert p.partition(b'a') == 524 + assert p.partition(b'ab') == 434 + assert p.partition(b'abc') == 107 + assert p.partition(b'123456789') == 566 + assert p.partition(b'\x00 ') == 742 diff --git a/test/test_producer.py b/test/test_producer.py index 850cb80..b84feb4 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,257 +1,34 @@ -# -*- coding: utf-8 -*- - -import collections -import logging -import threading -import time - -from mock import MagicMock, patch -from . import unittest - -from kafka import SimpleClient, SimpleProducer, KeyedProducer -from kafka.common import ( - AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, - ProduceResponsePayload, RetryOptions, TopicPartition -) -from kafka.producer.base import Producer, _send_upstream -from kafka.protocol import CODEC_NONE - -from six.moves import queue, xrange - - -class TestKafkaProducer(unittest.TestCase): - def test_producer_message_types(self): - - producer = Producer(MagicMock()) - topic = b"test-topic" - partition = 0 - - bad_data_types = (u'你怎么样?', 12, ['a', 'list'], - ('a', 'tuple'), {'a': 'dict'}, None,) - for m in bad_data_types: - with self.assertRaises(TypeError): - logging.debug("attempting to send message of type %s", type(m)) - producer.send_messages(topic, partition, m) - - good_data_types = (b'a string!',) - for m in good_data_types: - # This should not raise an exception - producer.send_messages(topic, partition, m) - - def test_keyedproducer_message_types(self): - client = MagicMock() - client.get_partition_ids_for_topic.return_value = [0, 1] - producer = KeyedProducer(client) - topic = b"test-topic" - key = b"testkey" - - bad_data_types = (u'你怎么样?', 12, ['a', 'list'], - ('a', 'tuple'), {'a': 'dict'},) - for m in bad_data_types: - with self.assertRaises(TypeError): - logging.debug("attempting to send message of type %s", type(m)) - producer.send_messages(topic, key, m) - - good_data_types = (b'a string!', None,) - for m in good_data_types: - # This should not raise an exception - producer.send_messages(topic, key, m) - - def test_topic_message_types(self): - client = MagicMock() - - def partitions(topic): - return [0, 1] - - client.get_partition_ids_for_topic = partitions - - producer = SimpleProducer(client, random_start=False) - topic = b"test-topic" - producer.send_messages(topic, b'hi') - assert client.send_produce_request.called - - @patch('kafka.producer.base._send_upstream') - def test_producer_async_queue_overfilled(self, mock): - queue_size = 2 - producer = Producer(MagicMock(), async=True, - async_queue_maxsize=queue_size) - - topic = b'test-topic' - partition = 0 - message = b'test-message' - - with self.assertRaises(AsyncProducerQueueFull): - message_list = [message] * (queue_size + 1) - producer.send_messages(topic, partition, *message_list) - self.assertEqual(producer.queue.qsize(), queue_size) - for _ in xrange(producer.queue.qsize()): - producer.queue.get() - - def test_producer_sync_fail_on_error(self): - error = FailedPayloadsError('failure') - with patch.object(SimpleClient, 'load_metadata_for_topics'): - with patch.object(SimpleClient, 'ensure_topic_exists'): - with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): - with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): - - client = SimpleClient(MagicMock()) - producer = SimpleProducer(client, async=False, sync_fail_on_error=False) - - # This should not raise - (response,) = producer.send_messages('foobar', b'test message') - self.assertEqual(response, error) - - producer = SimpleProducer(client, async=False, sync_fail_on_error=True) - with self.assertRaises(FailedPayloadsError): - producer.send_messages('foobar', b'test message') - - def test_cleanup_is_not_called_on_stopped_producer(self): - producer = Producer(MagicMock(), async=True) - producer.stopped = True - with patch.object(producer, 'stop') as mocked_stop: - producer._cleanup_func(producer) - self.assertEqual(mocked_stop.call_count, 0) - - def test_cleanup_is_called_on_running_producer(self): - producer = Producer(MagicMock(), async=True) - producer.stopped = False - with patch.object(producer, 'stop') as mocked_stop: - producer._cleanup_func(producer) - self.assertEqual(mocked_stop.call_count, 1) - - -class TestKafkaProducerSendUpstream(unittest.TestCase): - - def setUp(self): - self.client = MagicMock() - self.queue = queue.Queue() - - def _run_process(self, retries_limit=3, sleep_timeout=1): - # run _send_upstream process with the queue - stop_event = threading.Event() - retry_options = RetryOptions(limit=retries_limit, - backoff_ms=50, - retry_on_timeouts=False) - self.thread = threading.Thread( - target=_send_upstream, - args=(self.queue, self.client, CODEC_NONE, - 0.3, # batch time (seconds) - 3, # batch length - Producer.ACK_AFTER_LOCAL_WRITE, - Producer.DEFAULT_ACK_TIMEOUT, - retry_options, - stop_event)) - self.thread.daemon = True - self.thread.start() - time.sleep(sleep_timeout) - stop_event.set() - - def test_wo_retries(self): - - # lets create a queue and add 10 messages for 1 partition - for i in range(10): - self.queue.put((TopicPartition("test", 0), "msg %i", "key %i")) - - self._run_process() - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 4 non-void cals: - # 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.client.send_produce_request.call_count, 4) - - def test_first_send_failed(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) - - # Mock offsets counter for closure - offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) - self.client.is_first_time = True - def send_side_effect(reqs, *args, **kwargs): - if self.client.is_first_time: - self.client.is_first_time = False - return [FailedPayloadsError(req) for req in reqs] - responses = [] - for req in reqs: - offset = offsets[req.topic][req.partition] - offsets[req.topic][req.partition] += len(req.messages) - responses.append( - ProduceResponsePayload(req.topic, req.partition, 0, offset) - ) - return responses - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(2) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 5 non-void calls: 1st failed batch of 3 msgs - # plus 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.client.send_produce_request.call_count, 5) - - def test_with_limited_retries(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i)) - - def send_side_effect(reqs, *args, **kwargs): - return [FailedPayloadsError(req) for req in reqs] - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(3, 3) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 16 non-void calls: - # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + - # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 - self.assertEqual(self.client.send_produce_request.call_count, 16) - - def test_async_producer_not_leader(self): - - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) - - # Mock offsets counter for closure - offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) - self.client.is_first_time = True - def send_side_effect(reqs, *args, **kwargs): - if self.client.is_first_time: - self.client.is_first_time = False - return [ProduceResponsePayload(req.topic, req.partition, - NotLeaderForPartitionError.errno, -1) - for req in reqs] - - responses = [] - for req in reqs: - offset = offsets[req.topic][req.partition] - offsets[req.topic][req.partition] += len(req.messages) - responses.append( - ProduceResponsePayload(req.topic, req.partition, 0, offset) - ) - return responses - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(2) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 5 non-void calls: 1st failed batch of 3 msgs - # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 - self.assertEqual(self.client.send_produce_request.call_count, 5) - - def tearDown(self): - for _ in xrange(self.queue.qsize()): - self.queue.get() +import pytest + +from kafka import KafkaConsumer, KafkaProducer +from test.conftest import version +from test.testutil import random_string + + +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_end_to_end(kafka_broker): + connect_str = 'localhost:' + str(kafka_broker.port) + producer = KafkaProducer(bootstrap_servers=connect_str, + max_block_ms=10000, + value_serializer=str.encode) + consumer = KafkaConsumer(bootstrap_servers=connect_str, + consumer_timeout_ms=10000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode) + + topic = random_string(5) + + for i in range(1000): + producer.send(topic, 'msg %d' % i) + producer.flush() + producer.close() + + consumer.subscribe([topic]) + msgs = set() + for i in range(1000): + try: + msgs.add(next(consumer).value) + except StopIteration: + break + + assert msgs == set(['msg %d' % i for i in range(1000)]) diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py new file mode 100644 index 0000000..850cb80 --- /dev/null +++ b/test/test_producer_legacy.py @@ -0,0 +1,257 @@ +# -*- coding: utf-8 -*- + +import collections +import logging +import threading +import time + +from mock import MagicMock, patch +from . import unittest + +from kafka import SimpleClient, SimpleProducer, KeyedProducer +from kafka.common import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, + ProduceResponsePayload, RetryOptions, TopicPartition +) +from kafka.producer.base import Producer, _send_upstream +from kafka.protocol import CODEC_NONE + +from six.moves import queue, xrange + + +class TestKafkaProducer(unittest.TestCase): + def test_producer_message_types(self): + + producer = Producer(MagicMock()) + topic = b"test-topic" + partition = 0 + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'}, None,) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, partition, m) + + good_data_types = (b'a string!',) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, partition, m) + + def test_keyedproducer_message_types(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + producer = KeyedProducer(client) + topic = b"test-topic" + key = b"testkey" + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'},) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, key, m) + + good_data_types = (b'a string!', None,) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, key, m) + + def test_topic_message_types(self): + client = MagicMock() + + def partitions(topic): + return [0, 1] + + client.get_partition_ids_for_topic = partitions + + producer = SimpleProducer(client, random_start=False) + topic = b"test-topic" + producer.send_messages(topic, b'hi') + assert client.send_produce_request.called + + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(AsyncProducerQueueFull): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() + + def test_producer_sync_fail_on_error(self): + error = FailedPayloadsError('failure') + with patch.object(SimpleClient, 'load_metadata_for_topics'): + with patch.object(SimpleClient, 'ensure_topic_exists'): + with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): + + client = SimpleClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) + + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') + + def test_cleanup_is_not_called_on_stopped_producer(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = True + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 0) + + def test_cleanup_is_called_on_running_producer(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = False + with patch.object(producer, 'stop') as mocked_stop: + producer._cleanup_func(producer) + self.assertEqual(mocked_stop.call_count, 1) + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + self.client = MagicMock() + self.queue = queue.Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) + self.thread = threading.Thread( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + retry_options, + stop_event)) + self.thread.daemon = True + self.thread.start() + time.sleep(sleep_timeout) + stop_event.set() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [FailedPayloadsError(req) for req in reqs] + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponsePayload(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # plus 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i)) + + def send_side_effect(reqs, *args, **kwargs): + return [FailedPayloadsError(req) for req in reqs] + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 3) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void calls: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 + self.assertEqual(self.client.send_produce_request.call_count, 16) + + def test_async_producer_not_leader(self): + + for i in range(10): + self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [ProduceResponsePayload(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) + for req in reqs] + + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponsePayload(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() |