diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_admin.py | 31 | ||||
-rw-r--r-- | test/test_admin_integration.py | 107 |
2 files changed, 138 insertions, 0 deletions
diff --git a/test/test_admin.py b/test/test_admin.py index 300d5bc..279f85a 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -26,6 +26,37 @@ def test_new_partitions(): assert good_partitions.new_assignments == [[1, 2, 3]] +def test_acl_resource(): + good_acl = kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ALL, + kafka.admin.ACLPermissionType.ALLOW, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + + assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC) + assert(good_acl.operation == kafka.admin.ACLOperation.ALL) + assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW) + assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) + + with pytest.raises(IllegalArgumentError): + kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ANY, + kafka.admin.ACLPermissionType.ANY, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py new file mode 100644 index 0000000..0be1920 --- /dev/null +++ b/test/test_admin_integration.py @@ -0,0 +1,107 @@ +import pytest +import os + +from test.fixtures import ZookeeperFixture, KafkaFixture, version +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +from kafka.errors import NoError +from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL + + +class TestAdminClientIntegration(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance(0, cls.zk) + + @classmethod + def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + @kafka_versions('>=0.9.0') + def test_create_describe_delete_acls(self): + """Tests that we can add, list and remove ACLs + """ + + # Setup + brokers = '%s:%d' % (self.server.host, self.server.port) + admin_client = KafkaAdminClient( + bootstrap_servers=brokers + ) + + # Check that we don't have any ACLs in the cluster + acls, error = admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = admin_client.create_acls([acl]) + + self.assertFalse(len(result["failed"])) + self.assertEqual(len(result["succeeded"]), 1) + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = admin_client.describe_acls(acl_filter) + + self.assertIs(error, NoError) + self.assertEqual(1, len(acls)) + + # Remove the ACL + delete_results = admin_client.delete_acls( + [ + ACLFilter( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ] + ) + + self.assertEqual(1, len(delete_results)) + self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs + + + # Make sure the ACL does not exist in the cluster anymore + acls, error = admin_client.describe_acls( + ACLFilter( + principal="*", + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) |