diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_conn.py | 17 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 18 | ||||
-rw-r--r-- | test/test_partitioner.py | 23 | ||||
-rw-r--r-- | test/test_producer.py | 38 | ||||
-rw-r--r-- | test/test_producer_integration.py | 22 |
5 files changed, 116 insertions, 2 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 2b70344..1bdfc1e 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase): self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) + def test_get_connected_socket(self): + s = self.conn.get_connected_socket() + + self.assertEqual(s, self.MockCreateConn()) + + def test_get_connected_socket_on_dirty_conn(self): + # Dirty the connection + try: + self.conn._raise_connection_error() + except ConnectionError: + pass + + # Test that get_connected_socket tries to connect + self.assertEqual(self.MockCreateConn.call_count, 0) + self.conn.get_connected_socket() + self.assertEqual(self.MockCreateConn.call_count, 1) + def test_close__object_is_reusable(self): # test that sending to a closed connection diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 52b3e85..fee53f5 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get 5 back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + self.assert_message_count(messages, 5) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") @@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertGreaterEqual(t.interval, 1) + # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 + # second, get at least one back, no blocking + self.send_messages(0, range(0, 5)) + with Timer() as t: + messages = consumer.get_messages(count=10, block=1, timeout=1) + received_message_count = len(messages) + self.assertGreaterEqual(received_message_count, 1) + self.assert_message_count(messages, received_message_count) + self.assertLessEqual(t.interval, 1) + consumer.stop() @kafka_versions("all") diff --git a/test/test_partitioner.py b/test/test_partitioner.py new file mode 100644 index 0000000..67cd83b --- /dev/null +++ b/test/test_partitioner.py @@ -0,0 +1,23 @@ +import six +from . import unittest + +from kafka.partitioner import (Murmur2Partitioner) + +class TestMurmurPartitioner(unittest.TestCase): + def test_hash_bytes(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test')) + + def test_hash_encoding(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition('test'), p.partition(u'test')) + + def test_murmur2_java_compatibility(self): + p = Murmur2Partitioner(range(1000)) + # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner + self.assertEqual(681, p.partition(b'')) + self.assertEqual(524, p.partition(b'a')) + self.assertEqual(434, p.partition(b'ab')) + self.assertEqual(107, p.partition(b'abc')) + self.assertEqual(566, p.partition(b'123456789')) + self.assertEqual(742, p.partition(b'\x00 ')) diff --git a/test/test_producer.py b/test/test_producer.py index 27272f6..3c026e8 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ import time from mock import MagicMock, patch from . import unittest -from kafka import KafkaClient, SimpleProducer +from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponse, RetryOptions, TopicAndPartition @@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" partition = 0 - bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'}) + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'}, None,) for m in bad_data_types: with self.assertRaises(TypeError): logging.debug("attempting to send message of type %s", type(m)) @@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_keyedproducer_message_types(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + producer = KeyedProducer(client) + topic = b"test-topic" + key = b"testkey" + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'},) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, key, m) + + good_data_types = (b'a string!', None,) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, key, m) + def test_topic_message_types(self): client = MagicMock() @@ -91,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') + def test_cleanup_stop_is_called_on_not_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = True + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 0) + + def test_cleanup_stop_is_not_called_on_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = False + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 1) + class TestKafkaProducerSendUpstream(unittest.TestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index abf34c3..46b6851 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_keyedproducer_null_payload(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + key = "test" + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + self.assert_produce_response(resp, start_offsets[0]) + resp = producer.send_messages(self.topic, self.key("key2"), None) + self.assert_produce_response(resp, start_offsets[1]) + resp = producer.send_messages(self.topic, self.key("key3"), None) + self.assert_produce_response(resp, start_offsets[0]+1) + resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) + self.assert_produce_response(resp, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) + + producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) |