diff options
| author | Swen Wenzel <5111028+swenzel@users.noreply.github.com> | 2020-09-17 18:17:35 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-17 09:17:35 -0700 |
| commit | 16a0b3155fdeebe80295fcfb0f32d75af74dcb1a (patch) | |
| tree | df9b10de0ee8f9b40f28a74276182c096e7bbabf /test | |
| parent | e485a6ee2a1f05f2333e22b0fbdbafb12badaf3f (diff) | |
| download | kafka-python-16a0b3155fdeebe80295fcfb0f32d75af74dcb1a.tar.gz | |
Feature: delete consumergroups (#2040)
* Add consumergroup related errors
* Add DeleteGroups to protocol.admin
* Implement delete_groups feature on KafkaAdminClient
Diffstat (limited to 'test')
| -rw-r--r-- | test/test_admin_integration.py | 78 |
1 files changed, 77 insertions, 1 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index dc04537..06c40a2 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -7,7 +7,7 @@ from time import time, sleep from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -142,6 +142,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_does_not_exist(kafka_admin_client): """Tests that the describe consumer group call fails if the group coordinator is not available @@ -149,6 +150,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): with pytest.raises(GroupCoordinatorNotAvailableError): group_description = kafka_admin_client.describe_consumer_groups(['test']) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information @@ -236,3 +238,77 @@ def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_facto stop[c].set() threads[c].join() threads[c] = None + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + + send_messages(range(0, 100), partition=0) + consumer1 = kafka_consumer_factory(group_id=group1) + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=group2) + next(consumer2) + consumer2.close() + + consumer3 = kafka_consumer_factory(group_id=group3) + next(consumer3) + consumer3.close() + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2]) + } + assert delete_results[group1] == NoError + assert delete_results[group2] == NoError + assert group3 not in delete_results + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in consumergroups + assert group2 not in consumergroups + assert group3 in consumergroups + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + + send_messages(range(0, 100), partition=0) + consumer1 = kafka_consumer_factory(group_id=group1) + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id=group2) + next(consumer2) + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) + } + + assert delete_results[group1] == NoError + assert delete_results[group2] == NonEmptyGroupError + assert delete_results[group3] == GroupIdNotFoundError + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert group1 not in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups |
