diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client.py | 6 | ||||
-rw-r--r-- | test/test_consumer_group.py | 30 |
2 files changed, 28 insertions, 8 deletions
diff --git a/test/test_client.py b/test/test_client.py index 5a35c83..a53fce1 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -11,7 +11,7 @@ from kafka.common import ( BrokerMetadata, TopicPartition, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - KafkaTimeoutError, ConnectionError + KafkaTimeoutError, ConnectionError, FailedPayloadsError ) from kafka.conn import KafkaConnection from kafka.future import Future @@ -361,7 +361,7 @@ class TestSimpleClient(unittest.TestCase): "topic_noleader", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(LeaderNotAvailableError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) @patch('kafka.SimpleClient._get_conn') @@ -386,7 +386,7 @@ class TestSimpleClient(unittest.TestCase): "topic_doesnt_exist", 0, [create_message("a"), create_message("b")])] - with self.assertRaises(UnknownTopicOrPartitionError): + with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) def test_timeout(self): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 34b1be4..5fcfbe2 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -17,10 +17,13 @@ from test.conftest import version from test.testutil import random_string +def get_connect_str(kafka_broker): + return 'localhost:' + str(kafka_broker.port) + + @pytest.fixture def simple_client(kafka_broker): - connect_str = 'localhost:' + str(kafka_broker.port) - return SimpleClient(connect_str) + return SimpleClient(get_connect_str(kafka_broker)) @pytest.fixture @@ -37,8 +40,7 @@ def test_consumer(kafka_broker, version): if version >= (0, 8, 2) and version < (0, 9): topic(simple_client(kafka_broker)) - connect_str = 'localhost:' + str(kafka_broker.port) - consumer = KafkaConsumer(bootstrap_servers=connect_str) + consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) consumer.poll(500) assert len(consumer._client._conns) > 0 node_id = list(consumer._client._conns.keys())[0] @@ -49,7 +51,7 @@ def test_consumer(kafka_broker, version): @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") def test_group(kafka_broker, topic): num_partitions = 4 - connect_str = 'localhost:' + str(kafka_broker.port) + connect_str = get_connect_str(kafka_broker) consumers = {} stop = {} threads = {} @@ -120,6 +122,24 @@ def test_group(kafka_broker, topic): threads[c].join() +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_paused(kafka_broker, topic): + consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) + topics = [TopicPartition(topic, 1)] + consumer.assign(topics) + assert set(topics) == consumer.assignment() + assert set() == consumer.paused() + + consumer.pause(topics[0]) + assert set([topics[0]]) == consumer.paused() + + consumer.resume(topics[0]) + assert set() == consumer.paused() + + consumer.unsubscribe() + assert set() == consumer.paused() + + @pytest.fixture def conn(mocker): conn = mocker.patch('kafka.client_async.BrokerConnection') |