diff options
Diffstat (limited to 'tests/test_commands.py')
-rw-r--r-- | tests/test_commands.py | 176 |
1 files changed, 175 insertions, 1 deletions
diff --git a/tests/test_commands.py b/tests/test_commands.py index ef316af..00752fa 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -7,7 +7,7 @@ import redis import time from redis._compat import (unichr, ascii_letters, iteritems, iterkeys, - itervalues, long) + itervalues, long, basestring) from redis.client import parse_info from redis import exceptions @@ -66,6 +66,180 @@ class TestRedisCommands(object): r['a'] # SERVER INFORMATION + @skip_if_server_version_lt('5.9.101') + def test_acl_cat_no_category(self, r): + categories = r.acl_cat() + assert isinstance(categories, list) + assert 'read' in categories + + @skip_if_server_version_lt('5.9.101') + def test_acl_cat_with_category(self, r): + commands = r.acl_cat('read') + assert isinstance(commands, list) + assert 'get' in commands + + @skip_if_server_version_lt('5.9.101') + def test_acl_deluser(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + + request.addfinalizer(teardown) + + assert r.acl_deluser(username) == 0 + assert r.acl_setuser(username, enabled=False, reset=True) + assert r.acl_deluser(username) == 1 + + @skip_if_server_version_lt('5.9.101') + def test_acl_genpass(self, r): + password = r.acl_genpass() + assert isinstance(password, basestring) + + @skip_if_server_version_lt('5.9.101') + def test_acl_getuser_setuser(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + request.addfinalizer(teardown) + + # test enabled=False + assert r.acl_setuser(username, enabled=False, reset=True) + assert r.acl_getuser(username) == { + 'categories': ['-@all'], + 'commands': [], + 'enabled': False, + 'flags': ['off'], + 'keys': [], + 'passwords': [], + } + + # test nopass=True + assert r.acl_setuser(username, enabled=True, reset=True, nopass=True) + assert r.acl_getuser(username) == { + 'categories': ['-@all'], + 'commands': [], + 'enabled': True, + 'flags': ['on', 'nopass'], + 'keys': [], + 'passwords': [], + } + + # test all args + assert r.acl_setuser(username, enabled=True, reset=True, + passwords=['+pass1', '+pass2'], + categories=['+set', '+@hash', '-geo'], + commands=['+get', '+mget', '-hset'], + keys=['cache:*', 'objects:*']) + acl = r.acl_getuser(username) + assert set(acl['categories']) == set(['-@all', '+@set', '+@hash']) + assert set(acl['commands']) == set(['+get', '+mget', '-hset']) + assert acl['enabled'] is True + assert acl['flags'] == ['on'] + assert set(acl['keys']) == set([b'cache:*', b'objects:*']) + assert len(acl['passwords']) == 2 + + # test reset=False keeps existing ACL and applies new ACL on top + assert r.acl_setuser(username, enabled=True, reset=True, + passwords=['+pass1'], + categories=['+@set'], + commands=['+get'], + keys=['cache:*']) + assert r.acl_setuser(username, enabled=True, + passwords=['+pass2'], + categories=['+@hash'], + commands=['+mget'], + keys=['objects:*']) + acl = r.acl_getuser(username) + assert set(acl['categories']) == set(['-@all', '+@set', '+@hash']) + assert set(acl['commands']) == set(['+get', '+mget']) + assert acl['enabled'] is True + assert acl['flags'] == ['on'] + assert set(acl['keys']) == set([b'cache:*', b'objects:*']) + assert len(acl['passwords']) == 2 + + # test removal of passwords + assert r.acl_setuser(username, enabled=True, reset=True, + passwords=['+pass1', '+pass2']) + assert len(r.acl_getuser(username)['passwords']) == 2 + assert r.acl_setuser(username, enabled=True, + passwords=['-pass2']) + assert len(r.acl_getuser(username)['passwords']) == 1 + + # Resets and tests that hashed passwords are set properly. + hashed_password = ('5e884898da28047151d0e56f8dc629' + '2773603d0d6aabbdd62a11ef721d1542d8') + assert r.acl_setuser(username, enabled=True, reset=True, + hashed_passwords=['+' + hashed_password]) + acl = r.acl_getuser(username) + assert acl['passwords'] == [hashed_password] + + # test removal of hashed passwords + assert r.acl_setuser(username, enabled=True, reset=True, + hashed_passwords=['+' + hashed_password], + passwords=['+pass1']) + assert len(r.acl_getuser(username)['passwords']) == 2 + assert r.acl_setuser(username, enabled=True, + hashed_passwords=['-' + hashed_password]) + assert len(r.acl_getuser(username)['passwords']) == 1 + + @skip_if_server_version_lt('5.9.101') + def test_acl_list(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + request.addfinalizer(teardown) + + assert r.acl_setuser(username, enabled=False, reset=True) + users = r.acl_list() + assert 'user %s off -@all' % username in users + + @skip_if_server_version_lt('5.9.101') + def test_acl_setuser_categories_without_prefix_fails(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + request.addfinalizer(teardown) + + with pytest.raises(exceptions.DataError): + r.acl_setuser(username, categories=['list']) + + @skip_if_server_version_lt('5.9.101') + def test_acl_setuser_commands_without_prefix_fails(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + request.addfinalizer(teardown) + + with pytest.raises(exceptions.DataError): + r.acl_setuser(username, commands=['get']) + + @skip_if_server_version_lt('5.9.101') + def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request): + username = 'redis-py-user' + + def teardown(): + r.acl_deluser(username) + request.addfinalizer(teardown) + + with pytest.raises(exceptions.DataError): + r.acl_setuser(username, passwords='+mypass', nopass=True) + + @skip_if_server_version_lt('5.9.101') + def test_acl_users(self, r): + users = r.acl_users() + assert isinstance(users, list) + assert len(users) > 0 + + @skip_if_server_version_lt('5.9.101') + def test_acl_whoami(self, r): + username = r.acl_whoami() + assert isinstance(username, basestring) + def test_client_list(self, r): clients = r.client_list() assert isinstance(clients[0], dict) |