diff options
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 273 |
1 files changed, 265 insertions, 8 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index badac32..0ade3e9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,14 +11,16 @@ from kafka.client_async import KafkaClient, selectors import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, - UnrecognizedBrokerVersion) + UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ + ACLResourcePatternType from kafka.version import __version__ @@ -470,14 +472,269 @@ class KafkaAdminClient(object): # describe cluster functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + @staticmethod + def _convert_describe_acls_response_to_acls(describe_response): + version = describe_response.API_VERSION + + error = Errors.for_code(describe_response.error_code) + acl_list = [] + for resources in describe_response.resources: + if version == 0: + resource_type, resource_name, acls = resources + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version <= 1: + resource_type, resource_name, resource_pattern_type, acls = resources + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + for acl in acls: + principal, host, operation, permission_type = acl + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_list.append(conv_acl) + + return (acl_list, error,) + + def describe_acls(self, acl_filter): + """Describe a set of ACLs + + Used to return a set of ACLs matching the supplied ACLFilter. + The cluster must be configured with an authorizer for this to work, or + you will get a SecurityDisabledError + + :param acl_filter: an ACLFilter object + :return: tuple of a list of matching ACL objects and a KafkaError (NoError if successful) + """ - # create_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type - # delete_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + ) + else: + raise NotImplementedError( + "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # optionally we could retry if error_type.retriable + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + + return self._convert_describe_acls_response_to_acls(response) + + @staticmethod + def _convert_create_acls_resource_request_v0(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_response_to_acls(acls, create_response): + version = create_response.API_VERSION + + creations_error = [] + creations_success = [] + for i, creations in enumerate(create_response.creation_responses): + if version <= 1: + error_code, error_message = creations + acl = acls[i] + error = Errors.for_code(error_code) + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + if error is Errors.NoError: + creations_success.append(acl) + else: + creations_error.append((acl, error,)) + + return {"succeeded": creations_success, "failed": creations_error} + + def create_acls(self, acls): + """Create a list of ACLs + + This endpoint only accepts a list of concrete ACL objects, no ACLFilters. + Throws TopicAlreadyExistsError if topic is already present. + + :param acls: a list of ACL objects + :return: dict of successes and failures + """ + + for acl in acls: + if not isinstance(acl, ACL): + raise IllegalArgumentError("acls must contain ACL objects") + + version = self._matching_api_version(CreateAclsRequest) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] + ) + else: + raise NotImplementedError( + "Support for CreateAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + + return self._convert_create_acls_response_to_acls(acls, response) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): + version = delete_response.API_VERSION + filter_result_list = [] + for i, filter_responses in enumerate(delete_response.filter_responses): + filter_error_code, filter_error_message, matching_acls = filter_responses + filter_error = Errors.for_code(filter_error_code) + acl_result_list = [] + for acl in matching_acls: + if version == 0: + error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version == 1: + error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + acl_error = Errors.for_code(error_code) + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_result_list.append((conv_acl, acl_error,)) + filter_result_list.append((acl_filters[i], acl_result_list, filter_error,)) + return filter_result_list + + def delete_acls(self, acl_filters): + """Delete a set of ACLs + + Deletes all ACLs matching the list of input ACLFilter + + :param acl_filters: a list of ACLFilter + :return: a list of 3-tuples corresponding to the list of input filters. + The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) + """ + + for acl in acl_filters: + if not isinstance(acl, ACLFilter): + raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") + + version = self._matching_api_version(DeleteAclsRequest) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] + ) + else: + raise NotImplementedError( + "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod def _convert_describe_config_resource_request(config_resource): |