summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py273
1 files changed, 265 insertions, 8 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index badac32..0ade3e9 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -11,14 +11,16 @@ from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
- UnrecognizedBrokerVersion)
+ UnrecognizedBrokerVersion, IllegalArgumentError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
- ListGroupsRequest, DescribeGroupsRequest)
+ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
+ ACLResourcePatternType
from kafka.version import __version__
@@ -470,14 +472,269 @@ class KafkaAdminClient(object):
# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
- # describe_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ @staticmethod
+ def _convert_describe_acls_response_to_acls(describe_response):
+ version = describe_response.API_VERSION
+
+ error = Errors.for_code(describe_response.error_code)
+ acl_list = []
+ for resources in describe_response.resources:
+ if version == 0:
+ resource_type, resource_name, acls = resources
+ resource_pattern_type = ACLResourcePatternType.LITERAL.value
+ elif version <= 1:
+ resource_type, resource_name, resource_pattern_type, acls = resources
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+ for acl in acls:
+ principal, host, operation, permission_type = acl
+ conv_acl = ACL(
+ principal=principal,
+ host=host,
+ operation=ACLOperation(operation),
+ permission_type=ACLPermissionType(permission_type),
+ resource_pattern=ResourcePattern(
+ ResourceType(resource_type),
+ resource_name,
+ ACLResourcePatternType(resource_pattern_type)
+ )
+ )
+ acl_list.append(conv_acl)
+
+ return (acl_list, error,)
+
+ def describe_acls(self, acl_filter):
+ """Describe a set of ACLs
+
+ Used to return a set of ACLs matching the supplied ACLFilter.
+ The cluster must be configured with an authorizer for this to work, or
+ you will get a SecurityDisabledError
+
+ :param acl_filter: an ACLFilter object
+ :return: tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
+ """
- # create_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ version = self._matching_api_version(DescribeAclsRequest)
+ if version == 0:
+ request = DescribeAclsRequest[version](
+ resource_type=acl_filter.resource_pattern.resource_type,
+ resource_name=acl_filter.resource_pattern.resource_name,
+ principal=acl_filter.principal,
+ host=acl_filter.host,
+ operation=acl_filter.operation,
+ permission_type=acl_filter.permission_type
+ )
+ elif version <= 1:
+ request = DescribeAclsRequest[version](
+ resource_type=acl_filter.resource_pattern.resource_type,
+ resource_name=acl_filter.resource_pattern.resource_name,
+ resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
+ principal=acl_filter.principal,
+ host=acl_filter.host,
+ operation=acl_filter.operation,
+ permission_type=acl_filter.permission_type
- # delete_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ )
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # optionally we could retry if error_type.retriable
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+
+ return self._convert_describe_acls_response_to_acls(response)
+
+ @staticmethod
+ def _convert_create_acls_resource_request_v0(acl):
+
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_create_acls_resource_request_v1(acl):
+
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.resource_pattern.pattern_type,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_create_acls_response_to_acls(acls, create_response):
+ version = create_response.API_VERSION
+
+ creations_error = []
+ creations_success = []
+ for i, creations in enumerate(create_response.creation_responses):
+ if version <= 1:
+ error_code, error_message = creations
+ acl = acls[i]
+ error = Errors.for_code(error_code)
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ if error is Errors.NoError:
+ creations_success.append(acl)
+ else:
+ creations_error.append((acl, error,))
+
+ return {"succeeded": creations_success, "failed": creations_error}
+
+ def create_acls(self, acls):
+ """Create a list of ACLs
+
+ This endpoint only accepts a list of concrete ACL objects, no ACLFilters.
+ Throws TopicAlreadyExistsError if topic is already present.
+
+ :param acls: a list of ACL objects
+ :return: dict of successes and failures
+ """
+
+ for acl in acls:
+ if not isinstance(acl, ACL):
+ raise IllegalArgumentError("acls must contain ACL objects")
+
+ version = self._matching_api_version(CreateAclsRequest)
+ if version == 0:
+ request = CreateAclsRequest[version](
+ creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
+ )
+ elif version <= 1:
+ request = CreateAclsRequest[version](
+ creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
+ )
+ else:
+ raise NotImplementedError(
+ "Support for CreateAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+
+ return self._convert_create_acls_response_to_acls(acls, response)
+
+ @staticmethod
+ def _convert_delete_acls_resource_request_v0(acl):
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_delete_acls_resource_request_v1(acl):
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.resource_pattern.pattern_type,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response):
+ version = delete_response.API_VERSION
+ filter_result_list = []
+ for i, filter_responses in enumerate(delete_response.filter_responses):
+ filter_error_code, filter_error_message, matching_acls = filter_responses
+ filter_error = Errors.for_code(filter_error_code)
+ acl_result_list = []
+ for acl in matching_acls:
+ if version == 0:
+ error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl
+ resource_pattern_type = ACLResourcePatternType.LITERAL.value
+ elif version == 1:
+ error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+ acl_error = Errors.for_code(error_code)
+ conv_acl = ACL(
+ principal=principal,
+ host=host,
+ operation=ACLOperation(operation),
+ permission_type=ACLPermissionType(permission_type),
+ resource_pattern=ResourcePattern(
+ ResourceType(resource_type),
+ resource_name,
+ ACLResourcePatternType(resource_pattern_type)
+ )
+ )
+ acl_result_list.append((conv_acl, acl_error,))
+ filter_result_list.append((acl_filters[i], acl_result_list, filter_error,))
+ return filter_result_list
+
+ def delete_acls(self, acl_filters):
+ """Delete a set of ACLs
+
+ Deletes all ACLs matching the list of input ACLFilter
+
+ :param acl_filters: a list of ACLFilter
+ :return: a list of 3-tuples corresponding to the list of input filters.
+ The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
+ """
+
+ for acl in acl_filters:
+ if not isinstance(acl, ACLFilter):
+ raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")
+
+ version = self._matching_api_version(DeleteAclsRequest)
+
+ if version == 0:
+ request = DeleteAclsRequest[version](
+ filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters]
+ )
+ elif version <= 1:
+ request = DeleteAclsRequest[version](
+ filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
+ )
+ else:
+ raise NotImplementedError(
+ "Support for DeleteAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+ return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)
@staticmethod
def _convert_describe_config_resource_request(config_resource):