diff options
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 93 |
1 files changed, 89 insertions, 4 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1b91e1b..97fe73a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,7 +19,9 @@ from kafka.errors import ( from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, + DeleteGroupsRequest +) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array @@ -337,12 +339,34 @@ class KafkaAdminClient(object): name as a string. :return: The node_id of the broker that is the coordinator. """ - # Note: Java may change how this is implemented in KAFKA-6791. future = self._find_coordinator_id_send_request(group_id) self._wait_for_futures([future]) response = future.value return self._find_coordinator_id_process_response(response) + def _find_many_coordinator_ids(self, group_ids): + """Find the broker node_id of the coordinator for each of the given groups. + + Sends a FindCoordinatorRequest message to the cluster for each group_id. + Will block until the FindCoordinatorResponse is received for all groups. + Any errors are immediately raised. + + :param group_ids: A list of consumer group IDs. This is typically the group + name as a string. + :return: A list of tuples (group_id, node_id) where node_id is the id + of the broker that is the coordinator for the corresponding group. + """ + futures = { + group_id: self._find_coordinator_id_send_request(group_id) + for group_id in group_ids + } + self._wait_for_futures(list(futures.values())) + groups_coordinators = [ + (group_id, self._find_coordinator_id_process_response(f.value)) + for group_id, f in futures.items() + ] + return groups_coordinators + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -1261,8 +1285,69 @@ class KafkaAdminClient(object): response = future.value return self._list_consumer_group_offsets_process_response(response) - # delete groups protocol not yet implemented - # Note: send the request to the group's coordinator. + def delete_consumer_groups(self, group_ids, group_coordinator_id=None): + """Delete Consumer Group Offsets for given consumer groups. + + Note: + This does not verify that the group ids actually exist and + group_coordinator_id is the correct coordinator for all these groups. + + The result needs checking for potential errors. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. Use only if all groups are coordinated by the same broker. + If set to None, will query the cluster to find the coordinator for every single group. + Explicitly specifying this can be useful to prevent + that extra network round trips if you already know the group + coordinator. Default: None. + :return: A list of tuples (group_id, KafkaError) + """ + if group_coordinator_id is not None: + futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] + else: + groups_coordinators = defaultdict(list) + for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids): + groups_coordinators[group_coordinator_id].append(group_id) + futures = [ + self._delete_consumer_groups_send_request(group_ids, group_coordinator_id) + for group_coordinator_id, group_ids in groups_coordinators.items() + ] + + self._wait_for_futures(futures) + + results = [] + for f in futures: + results.extend(self._convert_delete_groups_response(f.value)) + return results + + def _convert_delete_groups_response(self, response): + if response.API_VERSION <= 1: + results = [] + for group_id, error_code in response.results: + results.append((group_id, Errors.for_code(error_code))) + return results + else: + raise NotImplementedError( + "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + + def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): + """Send a DeleteGroups request to a broker. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. + :return: A message future + """ + version = self._matching_api_version(DeleteGroupsRequest) + if version <= 1: + request = DeleteGroupsRequest[version](group_ids) + else: + raise NotImplementedError( + "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(group_coordinator_id, request) def _wait_for_futures(self, futures): while not all(future.succeeded() for future in futures): |