summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_client_integration.py16
-rw-r--r--test/test_codec.py8
-rw-r--r--test/test_conn.py6
-rw-r--r--test/test_consumer.py7
-rw-r--r--test/test_consumer_integration.py159
-rw-r--r--test/test_failover_integration.py2
-rw-r--r--test/test_package.py22
-rw-r--r--test/test_producer_integration.py20
8 files changed, 177 insertions, 63 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 0cd2c9e..cc60778 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -32,12 +32,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch])
- self.assertEquals(fetch_resp.error, 0)
- self.assertEquals(fetch_resp.topic, self.topic)
- self.assertEquals(fetch_resp.partition, 0)
+ self.assertEqual(fetch_resp.error, 0)
+ self.assertEqual(fetch_resp.topic, self.topic)
+ self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages)
- self.assertEquals(len(messages), 0)
+ self.assertEqual(len(messages), 0)
@kafka_versions("all")
def test_ensure_topic_exists(self):
@@ -58,10 +58,10 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
- self.assertEquals(resp.error, 0)
+ self.assertEqual(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
- self.assertEquals(resp.error, 0)
- self.assertEquals(resp.offset, 42)
- self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now
+ self.assertEqual(resp.error, 0)
+ self.assertEqual(resp.offset, 42)
+ self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now
diff --git a/test/test_codec.py b/test/test_codec.py
index 0ea1074..2d7670a 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -15,14 +15,14 @@ class TestCodec(unittest.TestCase):
for i in xrange(1000):
s1 = random_string(100)
s2 = gzip_decode(gzip_encode(s1))
- self.assertEquals(s1, s2)
+ self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = snappy_decode(snappy_encode(s1))
- self.assertEquals(s1, s2)
+ self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
@@ -53,7 +53,7 @@ class TestCodec(unittest.TestCase):
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
- self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
+ self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
@@ -68,5 +68,5 @@ class TestCodec(unittest.TestCase):
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
- self.assertEquals(compressed, to_ensure)
+ self.assertEqual(compressed, to_ensure)
diff --git a/test/test_conn.py b/test/test_conn.py
index 7b3beb7..2c8f3b2 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -120,7 +120,7 @@ class ConnTest(unittest.TestCase):
def test_recv(self):
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
def test_recv__reconnects_on_dirty_conn(self):
@@ -151,8 +151,8 @@ class ConnTest(unittest.TestCase):
def test_recv__doesnt_consume_extra_data_in_stream(self):
# Here just test that each call to recv will return a single payload
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
def test_close__object_is_reusable(self):
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 9060919..7b8f370 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -2,9 +2,14 @@
from mock import MagicMock
from . import unittest
-from kafka.consumer import SimpleConsumer
+from kafka import SimpleConsumer, KafkaConsumer
+from kafka.common import KafkaConfigurationError
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+
+ def test_broker_list_required(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer()
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2762008..ea32318 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,9 +1,12 @@
+import logging
import os
from six.moves import xrange
-from kafka import SimpleConsumer, MultiProcessConsumer, create_message
-from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
+from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
+from kafka.common import (
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -36,16 +39,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
- self.assertEquals(resp.error, 0)
+ self.assertEqual(resp.error, 0)
return [ x.value for x in messages ]
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
- self.assertEquals(len(messages), num_messages)
+ self.assertEqual(len(messages), num_messages)
# Make sure there are no duplicates
- self.assertEquals(len(set(messages)), num_messages)
+ self.assertEqual(len(set(messages)), num_messages)
+
+ def consumer(self, **kwargs):
+ if os.environ['KAFKA_VERSION'] == "0.8.0":
+ # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['auto_commit'] = False
+ else:
+ kwargs.setdefault('auto_commit', True)
+
+ consumer_class = kwargs.pop('consumer', SimpleConsumer)
+ group = kwargs.pop('group', self.id().encode('utf-8'))
+ topic = kwargs.pop('topic', self.topic)
+
+ if consumer_class == SimpleConsumer:
+ kwargs.setdefault('iter_timeout', 0)
+
+ return consumer_class(self.client, group, topic, **kwargs)
+
+ def kafka_consumer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ consumer = KafkaConsumer(self.topic,
+ metadata_broker_list=brokers,
+ **configs)
+ return consumer
@kafka_versions("all")
def test_simple_consumer(self):
@@ -114,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ consumer = self.consumer()
+
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
# move to last message, so one partition should have 1 pending
# message and other 0
@@ -175,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
consumer.stop()
@@ -225,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Consume giant message successfully
message = big_consumer.get_message(block=False, timeout=10)
self.assertIsNotNone(message)
- self.assertEquals(message.message.value, huge_message)
+ self.assertEqual(message.message.value, huge_message)
big_consumer.stop()
@@ -273,20 +301,101 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
messages = [ message for message in consumer ]
- self.assertEquals(len(messages), 2)
+ self.assertEqual(len(messages), 2)
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('auto_commit', True)
+ @kafka_versions("all")
+ def test_kafka_consumer(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', self.id().encode('utf-8'))
- topic = kwargs.pop('topic', self.topic)
+ # Start a consumer
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=5000)
+ n = 0
+ messages = {0: set(), 1: set()}
+ logging.debug("kafka consumer offsets: %s" % consumer.offsets())
+ for m in consumer:
+ logging.debug("Consumed message %s" % repr(m))
+ n += 1
+ messages[m.partition].add(m.offset)
+ if n >= 200:
+ break
+
+ self.assertEqual(len(messages[0]), 100)
+ self.assertEqual(len(messages[1]), 100)
- if consumer_class == SimpleConsumer:
- kwargs.setdefault('iter_timeout', 0)
+ @kafka_versions("all")
+ def test_kafka_consumer__blocking(self):
+ TIMEOUT_MS = 500
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=TIMEOUT_MS)
- return consumer_class(self.client, group, topic, **kwargs)
+ # Ask for 5 messages, nothing in queue, block 5 seconds
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ msg = consumer.next()
+ self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+ self.send_messages(0, range(0, 10))
+
+ # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+ messages = set()
+ with Timer() as t:
+ for i in range(5):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
+
+ # Ask for 10 messages, get 5 back, block 5 seconds
+ messages = set()
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ for i in range(10):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+ @kafka_versions("0.8.1", "0.8.1.1")
+ def test_kafka_consumer__offset_commit_resume(self):
+ GROUP_ID = random_string(10)
+
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ auto_offset_reset='smallest',
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ for _ in xrange(195):
+ m = consumer1.next()
+ output_msgs1.append(m)
+ consumer1.task_done(m)
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ consumer_timeout_ms = 100,
+ auto_offset_reset='smallest',
+ )
+
+ # 181-200
+ output_msgs2 = []
+ with self.assertRaises(ConsumerTimeout):
+ while True:
+ m = consumer2.next()
+ output_msgs2.append(m)
+ self.assert_message_count(output_msgs2, 20)
+ self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index d307d41..ca71f2d 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -121,7 +121,7 @@ class TestFailover(KafkaIntegrationTestCase):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
resp = producer.send_messages(topic, partition, random_string(10))
if len(resp) > 0:
- self.assertEquals(resp[0].error, 0)
+ self.assertEqual(resp[0].error, 0)
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition):
diff --git a/test/test_package.py b/test/test_package.py
index 9b69a7c..e91753c 100644
--- a/test/test_package.py
+++ b/test/test_package.py
@@ -3,27 +3,27 @@ from . import unittest
class TestPackage(unittest.TestCase):
def test_top_level_namespace(self):
import kafka as kafka1
- self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
- self.assertEquals(kafka1.client.__name__, "kafka.client")
- self.assertEquals(kafka1.codec.__name__, "kafka.codec")
+ self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(kafka1.client.__name__, "kafka.client")
+ self.assertEqual(kafka1.codec.__name__, "kafka.codec")
def test_submodule_namespace(self):
import kafka.client as client1
- self.assertEquals(client1.__name__, "kafka.client")
- self.assertEquals(client1.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(client1.__name__, "kafka.client")
+ self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
from kafka import client as client2
- self.assertEquals(client2.__name__, "kafka.client")
- self.assertEquals(client2.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(client2.__name__, "kafka.client")
+ self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
from kafka.client import KafkaClient as KafkaClient1
- self.assertEquals(KafkaClient1.__name__, "KafkaClient")
+ self.assertEqual(KafkaClient1.__name__, "KafkaClient")
from kafka.codec import gzip_encode as gzip_encode1
- self.assertEquals(gzip_encode1.__name__, "gzip_encode")
+ self.assertEqual(gzip_encode1.__name__, "gzip_encode")
from kafka import KafkaClient as KafkaClient2
- self.assertEquals(KafkaClient2.__name__, "KafkaClient")
+ self.assertEqual(KafkaClient2.__name__, "KafkaClient")
from kafka.codec import snappy_encode
- self.assertEquals(snappy_encode.__name__, "snappy_encode")
+ self.assertEqual(snappy_encode.__name__, "snappy_encode")
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index d68af72..4331d23 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -251,7 +251,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
@@ -301,7 +301,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
@@ -314,7 +314,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [
self.msg("one"),
@@ -350,7 +350,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
@@ -363,7 +363,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# Wait the timeout out
time.sleep(5)
@@ -389,7 +389,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -402,7 +402,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -429,9 +429,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
- self.assertEquals(resp.error, 0)
- self.assertEquals(resp.partition, partition)
+ self.assertEqual(resp.error, 0)
+ self.assertEqual(resp.partition, partition)
messages = [ x.message.value for x in resp.messages ]
self.assertEqual(messages, expected_messages)
- self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))
+ self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))