summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_admin.py31
-rw-r--r--test/test_admin_integration.py107
2 files changed, 138 insertions, 0 deletions
diff --git a/test/test_admin.py b/test/test_admin.py
index 300d5bc..279f85a 100644
--- a/test/test_admin.py
+++ b/test/test_admin.py
@@ -26,6 +26,37 @@ def test_new_partitions():
assert good_partitions.new_assignments == [[1, 2, 3]]
+def test_acl_resource():
+ good_acl = kafka.admin.ACL(
+ "User:bar",
+ "*",
+ kafka.admin.ACLOperation.ALL,
+ kafka.admin.ACLPermissionType.ALLOW,
+ kafka.admin.ResourcePattern(
+ kafka.admin.ResourceType.TOPIC,
+ "foo",
+ kafka.admin.ACLResourcePatternType.LITERAL
+ )
+ )
+
+ assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC)
+ assert(good_acl.operation == kafka.admin.ACLOperation.ALL)
+ assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW)
+ assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL)
+
+ with pytest.raises(IllegalArgumentError):
+ kafka.admin.ACL(
+ "User:bar",
+ "*",
+ kafka.admin.ACLOperation.ANY,
+ kafka.admin.ACLPermissionType.ANY,
+ kafka.admin.ResourcePattern(
+ kafka.admin.ResourceType.TOPIC,
+ "foo",
+ kafka.admin.ACLResourcePatternType.LITERAL
+ )
+ )
+
def test_new_topic():
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
new file mode 100644
index 0000000..0be1920
--- /dev/null
+++ b/test/test_admin_integration.py
@@ -0,0 +1,107 @@
+import pytest
+import os
+
+from test.fixtures import ZookeeperFixture, KafkaFixture, version
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+
+from kafka.errors import NoError
+from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+
+
+class TestAdminClientIntegration(KafkaIntegrationTestCase):
+ @classmethod
+ def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.zk = ZookeeperFixture.instance()
+ cls.server = KafkaFixture.instance(0, cls.zk)
+
+ @classmethod
+ def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.server.close()
+ cls.zk.close()
+
+ @kafka_versions('>=0.9.0')
+ def test_create_describe_delete_acls(self):
+ """Tests that we can add, list and remove ACLs
+ """
+
+ # Setup
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ admin_client = KafkaAdminClient(
+ bootstrap_servers=brokers
+ )
+
+ # Check that we don't have any ACLs in the cluster
+ acls, error = admin_client.describe_acls(
+ ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ )
+
+ self.assertIs(error, NoError)
+ self.assertEqual(0, len(acls))
+
+ # Try to add an ACL
+ acl = ACL(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ result = admin_client.create_acls([acl])
+
+ self.assertFalse(len(result["failed"]))
+ self.assertEqual(len(result["succeeded"]), 1)
+
+ # Check that we can list the ACL we created
+ acl_filter = ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ acls, error = admin_client.describe_acls(acl_filter)
+
+ self.assertIs(error, NoError)
+ self.assertEqual(1, len(acls))
+
+ # Remove the ACL
+ delete_results = admin_client.delete_acls(
+ [
+ ACLFilter(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ ]
+ )
+
+ self.assertEqual(1, len(delete_results))
+ self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs
+
+
+ # Make sure the ACL does not exist in the cluster anymore
+ acls, error = admin_client.describe_acls(
+ ACLFilter(
+ principal="*",
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ )
+ self.assertIs(error, NoError)
+ self.assertEqual(0, len(acls))