summaryrefslogtreecommitdiff
path: root/tests/test_commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_commands.py')
-rw-r--r--tests/test_commands.py176
1 files changed, 175 insertions, 1 deletions
diff --git a/tests/test_commands.py b/tests/test_commands.py
index ef316af..00752fa 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -7,7 +7,7 @@ import redis
import time
from redis._compat import (unichr, ascii_letters, iteritems, iterkeys,
- itervalues, long)
+ itervalues, long, basestring)
from redis.client import parse_info
from redis import exceptions
@@ -66,6 +66,180 @@ class TestRedisCommands(object):
r['a']
# SERVER INFORMATION
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_cat_no_category(self, r):
+ categories = r.acl_cat()
+ assert isinstance(categories, list)
+ assert 'read' in categories
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_cat_with_category(self, r):
+ commands = r.acl_cat('read')
+ assert isinstance(commands, list)
+ assert 'get' in commands
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_deluser(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+
+ request.addfinalizer(teardown)
+
+ assert r.acl_deluser(username) == 0
+ assert r.acl_setuser(username, enabled=False, reset=True)
+ assert r.acl_deluser(username) == 1
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_genpass(self, r):
+ password = r.acl_genpass()
+ assert isinstance(password, basestring)
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_getuser_setuser(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+ request.addfinalizer(teardown)
+
+ # test enabled=False
+ assert r.acl_setuser(username, enabled=False, reset=True)
+ assert r.acl_getuser(username) == {
+ 'categories': ['-@all'],
+ 'commands': [],
+ 'enabled': False,
+ 'flags': ['off'],
+ 'keys': [],
+ 'passwords': [],
+ }
+
+ # test nopass=True
+ assert r.acl_setuser(username, enabled=True, reset=True, nopass=True)
+ assert r.acl_getuser(username) == {
+ 'categories': ['-@all'],
+ 'commands': [],
+ 'enabled': True,
+ 'flags': ['on', 'nopass'],
+ 'keys': [],
+ 'passwords': [],
+ }
+
+ # test all args
+ assert r.acl_setuser(username, enabled=True, reset=True,
+ passwords=['+pass1', '+pass2'],
+ categories=['+set', '+@hash', '-geo'],
+ commands=['+get', '+mget', '-hset'],
+ keys=['cache:*', 'objects:*'])
+ acl = r.acl_getuser(username)
+ assert set(acl['categories']) == set(['-@all', '+@set', '+@hash'])
+ assert set(acl['commands']) == set(['+get', '+mget', '-hset'])
+ assert acl['enabled'] is True
+ assert acl['flags'] == ['on']
+ assert set(acl['keys']) == set([b'cache:*', b'objects:*'])
+ assert len(acl['passwords']) == 2
+
+ # test reset=False keeps existing ACL and applies new ACL on top
+ assert r.acl_setuser(username, enabled=True, reset=True,
+ passwords=['+pass1'],
+ categories=['+@set'],
+ commands=['+get'],
+ keys=['cache:*'])
+ assert r.acl_setuser(username, enabled=True,
+ passwords=['+pass2'],
+ categories=['+@hash'],
+ commands=['+mget'],
+ keys=['objects:*'])
+ acl = r.acl_getuser(username)
+ assert set(acl['categories']) == set(['-@all', '+@set', '+@hash'])
+ assert set(acl['commands']) == set(['+get', '+mget'])
+ assert acl['enabled'] is True
+ assert acl['flags'] == ['on']
+ assert set(acl['keys']) == set([b'cache:*', b'objects:*'])
+ assert len(acl['passwords']) == 2
+
+ # test removal of passwords
+ assert r.acl_setuser(username, enabled=True, reset=True,
+ passwords=['+pass1', '+pass2'])
+ assert len(r.acl_getuser(username)['passwords']) == 2
+ assert r.acl_setuser(username, enabled=True,
+ passwords=['-pass2'])
+ assert len(r.acl_getuser(username)['passwords']) == 1
+
+ # Resets and tests that hashed passwords are set properly.
+ hashed_password = ('5e884898da28047151d0e56f8dc629'
+ '2773603d0d6aabbdd62a11ef721d1542d8')
+ assert r.acl_setuser(username, enabled=True, reset=True,
+ hashed_passwords=['+' + hashed_password])
+ acl = r.acl_getuser(username)
+ assert acl['passwords'] == [hashed_password]
+
+ # test removal of hashed passwords
+ assert r.acl_setuser(username, enabled=True, reset=True,
+ hashed_passwords=['+' + hashed_password],
+ passwords=['+pass1'])
+ assert len(r.acl_getuser(username)['passwords']) == 2
+ assert r.acl_setuser(username, enabled=True,
+ hashed_passwords=['-' + hashed_password])
+ assert len(r.acl_getuser(username)['passwords']) == 1
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_list(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+ request.addfinalizer(teardown)
+
+ assert r.acl_setuser(username, enabled=False, reset=True)
+ users = r.acl_list()
+ assert 'user %s off -@all' % username in users
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_setuser_categories_without_prefix_fails(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+ request.addfinalizer(teardown)
+
+ with pytest.raises(exceptions.DataError):
+ r.acl_setuser(username, categories=['list'])
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_setuser_commands_without_prefix_fails(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+ request.addfinalizer(teardown)
+
+ with pytest.raises(exceptions.DataError):
+ r.acl_setuser(username, commands=['get'])
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request):
+ username = 'redis-py-user'
+
+ def teardown():
+ r.acl_deluser(username)
+ request.addfinalizer(teardown)
+
+ with pytest.raises(exceptions.DataError):
+ r.acl_setuser(username, passwords='+mypass', nopass=True)
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_users(self, r):
+ users = r.acl_users()
+ assert isinstance(users, list)
+ assert len(users) > 0
+
+ @skip_if_server_version_lt('5.9.101')
+ def test_acl_whoami(self, r):
+ username = r.acl_whoami()
+ assert isinstance(username, basestring)
+
def test_client_list(self, r):
clients = r.client_list()
assert isinstance(clients[0], dict)