diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_integration.py | 23 | ||||
-rw-r--r-- | test/test_unit.py | 187 |
2 files changed, 200 insertions, 10 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..1f37ebf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) @classmethod def tearDownClass(cls): # noqa @@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + class TestFailover(unittest.TestCase): @classmethod def setUpClass(cls): zk_chroot = random_string(10) - replicas = 2 + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) + + hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + cls.client = KafkaClient(hosts) @classmethod def tearDownClass(cls): @@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False) all_messages = [] for message in consumer: diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66..4ea9442 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,18 @@ import random import struct import unittest +from mock import patch + from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.common import ( + ProduceRequest, FetchRequest, + BrokerMetadata, PartitionMetadata, TopicAndPartition +) ITERATIONS = 1000 STRLEN = 100 @@ -217,5 +222,185 @@ class TestRequests(unittest.TestCase): self.assertEquals(enc, expect) +class TestKafkaClient(unittest.TestCase): + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertIsNone(resp) + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call fails when one of the host is available' + + from mock import MagicMock + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject conns + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + print 'mock_get_conn: %s:%d=%s' % (host, port, mocked_conns[(host, port)]) + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + + self.assertItemsEqual({}, client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + }, + client.topics_to_brokers) + + @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_noleader_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(hosts='broker_1:4567') + self.assertItemsEqual( + { + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + if __name__ == '__main__': unittest.main() |