diff options
author | Jeppe Andersen <2197398+jlandersen@users.noreply.github.com> | 2019-10-11 20:46:52 +0200 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-10-11 11:46:52 -0700 |
commit | 6d3800ca9f45fd953689a1787fc90a5e566e34ea (patch) | |
tree | f47705bfa7ba965a1e505cb3714116eb36771e20 /test | |
parent | 84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (diff) | |
download | kafka-python-6d3800ca9f45fd953689a1787fc90a5e566e34ea.tar.gz |
Fix describe config for multi-broker clusters (#1869)
* Fix describe config for multi-broker clusters
Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node.
This changes the logic to send all describe config requests to the specific broker.
Diffstat (limited to 'test')
-rw-r--r-- | test/test_admin_integration.py | 57 |
1 files changed, 56 insertions, 1 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 3efa021..0b041b2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -3,7 +3,8 @@ import pytest from test.testutil import env_kafka_version from kafka.errors import NoError -from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +from kafka.admin import ( + ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client): assert error is NoError assert len(acls) == 0 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): + """Tests that describe config returns configs for broker + """ + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 1 + assert configs[0].resources[0][2] == ConfigResourceType.BROKER + assert configs[0].resources[0][3] == str(broker_id) + assert len(configs[0].resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client): + """Tests that describe config returns configs for topic + """ + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) + + assert len(configs) == 1 + assert configs[0].resources[0][2] == ConfigResourceType.TOPIC + assert configs[0].resources[0][3] == topic + assert len(configs[0].resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client): + """Tests that describe config returns configs for mixed resource types (topic + broker) + """ + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ + ConfigResource(ConfigResourceType.TOPIC, topic), + ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 2 + + for config in configs: + assert (config.resources[0][2] == ConfigResourceType.TOPIC + and config.resources[0][3] == topic) or \ + (config.resources[0][2] == ConfigResourceType.BROKER + and config.resources[0][3] == str(broker_id)) + assert len(config.resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): + """Tests that describe config raises exception on non-integer broker id + """ + broker_id = "str" + + with pytest.raises(ValueError): + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) |