diff options
author | Ulrik Johansson <ulrik.johansson@gmail.com> | 2019-09-28 23:57:05 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-09-28 14:57:05 -0700 |
commit | 76ad6629350f20acfd6038c1e444a89bcd255f89 (patch) | |
tree | 00439631523c0c7b5df835f33b92f0f8d95f5812 /test/test_admin_integration.py | |
parent | 5e4d1516e0d903e411c71474cc5ba9e9b009cd8c (diff) | |
download | kafka-python-76ad6629350f20acfd6038c1e444a89bcd255f89.tar.gz |
Add ACL api to KafkaAdminClient (#1833)
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r-- | test/test_admin_integration.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py new file mode 100644 index 0000000..0be1920 --- /dev/null +++ b/test/test_admin_integration.py @@ -0,0 +1,107 @@ +import pytest +import os + +from test.fixtures import ZookeeperFixture, KafkaFixture, version +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +from kafka.errors import NoError +from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL + + +class TestAdminClientIntegration(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance(0, cls.zk) + + @classmethod + def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + @kafka_versions('>=0.9.0') + def test_create_describe_delete_acls(self): + """Tests that we can add, list and remove ACLs + """ + + # Setup + brokers = '%s:%d' % (self.server.host, self.server.port) + admin_client = KafkaAdminClient( + bootstrap_servers=brokers + ) + + # Check that we don't have any ACLs in the cluster + acls, error = admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = admin_client.create_acls([acl]) + + self.assertFalse(len(result["failed"])) + self.assertEqual(len(result["succeeded"]), 1) + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = admin_client.describe_acls(acl_filter) + + self.assertIs(error, NoError) + self.assertEqual(1, len(acls)) + + # Remove the ACL + delete_results = admin_client.delete_acls( + [ + ACLFilter( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ] + ) + + self.assertEqual(1, len(delete_results)) + self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs + + + # Make sure the ACL does not exist in the cluster anymore + acls, error = admin_client.describe_acls( + ACLFilter( + principal="*", + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) |