summaryrefslogtreecommitdiff
path: root/test/test_admin_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r--test/test_admin_integration.py78
1 files changed, 77 insertions, 1 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index dc04537..06c40a2 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -7,7 +7,7 @@ from time import time, sleep
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
-from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
+from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -142,6 +142,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
@@ -149,6 +150,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client):
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])
+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
@@ -236,3 +238,77 @@ def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_facto
stop[c].set()
threads[c].join()
threads[c] = None
+
+
+@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
+def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages):
+ random_group_id = 'test-group-' + random_string(6)
+ group1 = random_group_id + "_1"
+ group2 = random_group_id + "_2"
+ group3 = random_group_id + "_3"
+
+ send_messages(range(0, 100), partition=0)
+ consumer1 = kafka_consumer_factory(group_id=group1)
+ next(consumer1)
+ consumer1.close()
+
+ consumer2 = kafka_consumer_factory(group_id=group2)
+ next(consumer2)
+ consumer2.close()
+
+ consumer3 = kafka_consumer_factory(group_id=group3)
+ next(consumer3)
+ consumer3.close()
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 in consumergroups
+ assert group2 in consumergroups
+ assert group3 in consumergroups
+
+ delete_results = {
+ group_id: error
+ for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2])
+ }
+ assert delete_results[group1] == NoError
+ assert delete_results[group2] == NoError
+ assert group3 not in delete_results
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 not in consumergroups
+ assert group2 not in consumergroups
+ assert group3 in consumergroups
+
+
+@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
+def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages):
+ random_group_id = 'test-group-' + random_string(6)
+ group1 = random_group_id + "_1"
+ group2 = random_group_id + "_2"
+ group3 = random_group_id + "_3"
+
+ send_messages(range(0, 100), partition=0)
+ consumer1 = kafka_consumer_factory(group_id=group1)
+ next(consumer1)
+ consumer1.close()
+
+ consumer2 = kafka_consumer_factory(group_id=group2)
+ next(consumer2)
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 in consumergroups
+ assert group2 in consumergroups
+ assert group3 not in consumergroups
+
+ delete_results = {
+ group_id: error
+ for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3])
+ }
+
+ assert delete_results[group1] == NoError
+ assert delete_results[group2] == NonEmptyGroupError
+ assert delete_results[group3] == GroupIdNotFoundError
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 not in consumergroups
+ assert group2 in consumergroups
+ assert group3 not in consumergroups