summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py93
1 files changed, 89 insertions, 4 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 1b91e1b..97fe73a 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -19,7 +19,9 @@ from kafka.errors import (
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
- ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
+ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
+ DeleteGroupsRequest
+)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Array
@@ -337,12 +339,34 @@ class KafkaAdminClient(object):
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
- # Note: Java may change how this is implemented in KAFKA-6791.
future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
response = future.value
return self._find_coordinator_id_process_response(response)
+ def _find_many_coordinator_ids(self, group_ids):
+ """Find the broker node_id of the coordinator for each of the given groups.
+
+ Sends a FindCoordinatorRequest message to the cluster for each group_id.
+ Will block until the FindCoordinatorResponse is received for all groups.
+ Any errors are immediately raised.
+
+ :param group_ids: A list of consumer group IDs. This is typically the group
+ name as a string.
+ :return: A list of tuples (group_id, node_id) where node_id is the id
+ of the broker that is the coordinator for the corresponding group.
+ """
+ futures = {
+ group_id: self._find_coordinator_id_send_request(group_id)
+ for group_id in group_ids
+ }
+ self._wait_for_futures(list(futures.values()))
+ groups_coordinators = [
+ (group_id, self._find_coordinator_id_process_response(f.value))
+ for group_id, f in futures.items()
+ ]
+ return groups_coordinators
+
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
@@ -1261,8 +1285,69 @@ class KafkaAdminClient(object):
response = future.value
return self._list_consumer_group_offsets_process_response(response)
- # delete groups protocol not yet implemented
- # Note: send the request to the group's coordinator.
+ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
+ """Delete Consumer Group Offsets for given consumer groups.
+
+ Note:
+ This does not verify that the group ids actually exist and
+ group_coordinator_id is the correct coordinator for all these groups.
+
+ The result needs checking for potential errors.
+
+ :param group_ids: The consumer group ids of the groups which are to be deleted.
+ :param group_coordinator_id: The node_id of the broker which is the coordinator for
+ all the groups. Use only if all groups are coordinated by the same broker.
+ If set to None, will query the cluster to find the coordinator for every single group.
+ Explicitly specifying this can be useful to prevent
+ that extra network round trips if you already know the group
+ coordinator. Default: None.
+ :return: A list of tuples (group_id, KafkaError)
+ """
+ if group_coordinator_id is not None:
+ futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
+ else:
+ groups_coordinators = defaultdict(list)
+ for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
+ groups_coordinators[group_coordinator_id].append(group_id)
+ futures = [
+ self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
+ for group_coordinator_id, group_ids in groups_coordinators.items()
+ ]
+
+ self._wait_for_futures(futures)
+
+ results = []
+ for f in futures:
+ results.extend(self._convert_delete_groups_response(f.value))
+ return results
+
+ def _convert_delete_groups_response(self, response):
+ if response.API_VERSION <= 1:
+ results = []
+ for group_id, error_code in response.results:
+ results.append((group_id, Errors.for_code(error_code)))
+ return results
+ else:
+ raise NotImplementedError(
+ "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+
+ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
+ """Send a DeleteGroups request to a broker.
+
+ :param group_ids: The consumer group ids of the groups which are to be deleted.
+ :param group_coordinator_id: The node_id of the broker which is the coordinator for
+ all the groups.
+ :return: A message future
+ """
+ version = self._matching_api_version(DeleteGroupsRequest)
+ if version <= 1:
+ request = DeleteGroupsRequest[version](group_ids)
+ else:
+ raise NotImplementedError(
+ "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):