summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_conn.py17
-rw-r--r--test/test_consumer_integration.py18
-rw-r--r--test/test_partitioner.py23
-rw-r--r--test/test_producer.py38
-rw-r--r--test/test_producer_integration.py22
5 files changed, 116 insertions, 2 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index 2b70344..1bdfc1e 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase):
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
+ def test_get_connected_socket(self):
+ s = self.conn.get_connected_socket()
+
+ self.assertEqual(s, self.MockCreateConn())
+
+ def test_get_connected_socket_on_dirty_conn(self):
+ # Dirty the connection
+ try:
+ self.conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Test that get_connected_socket tries to connect
+ self.assertEqual(self.MockCreateConn.call_count, 0)
+ self.conn.get_connected_socket()
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+
def test_close__object_is_reusable(self):
# test that sending to a closed connection
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 52b3e85..fee53f5 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get 5 back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ self.assert_message_count(messages, 5)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")
@@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get at least one back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ received_message_count = len(messages)
+ self.assertGreaterEqual(received_message_count, 1)
+ self.assert_message_count(messages, received_message_count)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
new file mode 100644
index 0000000..67cd83b
--- /dev/null
+++ b/test/test_partitioner.py
@@ -0,0 +1,23 @@
+import six
+from . import unittest
+
+from kafka.partitioner import (Murmur2Partitioner)
+
+class TestMurmurPartitioner(unittest.TestCase):
+ def test_hash_bytes(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
+
+ def test_hash_encoding(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition('test'), p.partition(u'test'))
+
+ def test_murmur2_java_compatibility(self):
+ p = Murmur2Partitioner(range(1000))
+ # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
+ self.assertEqual(681, p.partition(b''))
+ self.assertEqual(524, p.partition(b'a'))
+ self.assertEqual(434, p.partition(b'ab'))
+ self.assertEqual(107, p.partition(b'abc'))
+ self.assertEqual(566, p.partition(b'123456789'))
+ self.assertEqual(742, p.partition(b'\x00 '))
diff --git a/test/test_producer.py b/test/test_producer.py
index 27272f6..3c026e8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,7 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
-from kafka import KafkaClient, SimpleProducer
+from kafka import KafkaClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase):
topic = b"test-topic"
partition = 0
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'}, None,)
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
@@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase):
# This should not raise an exception
producer.send_messages(topic, partition, m)
+ def test_keyedproducer_message_types(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+ producer = KeyedProducer(client)
+ topic = b"test-topic"
+ key = b"testkey"
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'},)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, key, m)
+
+ good_data_types = (b'a string!', None,)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, key, m)
+
def test_topic_message_types(self):
client = MagicMock()
@@ -91,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
+ def test_cleanup_stop_is_called_on_not_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = True
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 0)
+
+ def test_cleanup_stop_is_not_called_on_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = False
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 1)
+
class TestKafkaProducerSendUpstream(unittest.TestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index abf34c3..46b6851 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_keyedproducer_null_payload(self):
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ key = "test"
+
+ resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+ self.assert_produce_response(resp, start_offsets[0])
+ resp = producer.send_messages(self.topic, self.key("key2"), None)
+ self.assert_produce_response(resp, start_offsets[1])
+ resp = producer.send_messages(self.topic, self.key("key3"), None)
+ self.assert_produce_response(resp, start_offsets[0]+1)
+ resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
+ self.assert_produce_response(resp, start_offsets[1]+1)
+
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
+
+ producer.stop()
+
@kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)