summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2021-12-02 22:59:06 +0200
committerGitHub <noreply@github.com>2021-12-02 22:59:06 +0200
commit11b14630a6845c28acfd4220b72ed62d72913305 (patch)
tree7c4ae49db2320416a956551613d90cad26b44052
parentb7ffec08da97b71b10bbd139b32ff82d33d907f1 (diff)
downloadredis-py-11b14630a6845c28acfd4220b72ed62d72913305.tar.gz
Added support for MONITOR in clusters (#1756)
-rw-r--r--redis/cluster.py17
-rw-r--r--tests/conftest.py20
-rw-r--r--tests/test_cluster.py54
-rw-r--r--tests/test_commands.py53
-rw-r--r--tests/test_connection_pool.py12
-rw-r--r--tests/test_monitor.py4
-rw-r--r--tests/test_pubsub.py2
7 files changed, 117 insertions, 45 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index c5634a0..b1adeb7 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -670,6 +670,23 @@ class RedisCluster(RedisClusterCommands):
log.info(f"Changed the default cluster node to {node}")
return True
+ def monitor(self, target_node=None):
+ """
+ Returns a Monitor object for the specified target node.
+ The default cluster node will be selected if no target node was
+ specified.
+ Monitor is useful for handling the MONITOR command to the redis server.
+ next_command() method returns one command from monitor
+ listen() method yields commands from monitor.
+ """
+ if target_node is None:
+ target_node = self.get_default_node()
+ if target_node.redis_connection is None:
+ raise RedisClusterException(
+ f"Cluster Node {target_node.name} has no redis_connection"
+ )
+ return target_node.redis_connection.monitor()
+
def pubsub(self, node=None, host=None, port=None, **kwargs):
"""
Allows passing a ClusterNode, or host&port, to get a pubsub instance
diff --git a/tests/conftest.py b/tests/conftest.py
index 24783c0..ab29ee4 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -151,12 +151,12 @@ def skip_ifmodversion_lt(min_version: str, module_name: str):
raise AttributeError(f"No redis module named {module_name}")
-def skip_if_redis_enterprise(func):
+def skip_if_redis_enterprise():
check = REDIS_INFO["enterprise"] is True
return pytest.mark.skipif(check, reason="Redis enterprise")
-def skip_ifnot_redis_enterprise(func):
+def skip_ifnot_redis_enterprise():
check = REDIS_INFO["enterprise"] is False
return pytest.mark.skipif(check, reason="Not running in redis enterprise")
@@ -324,16 +324,18 @@ def master_host(request):
yield parts.hostname, parts.port
-def wait_for_command(client, monitor, command):
+def wait_for_command(client, monitor, command, key=None):
# issue a command with a key name that's local to this process.
# if we find a command with our key before the command we're waiting
# for, something went wrong
- redis_version = REDIS_INFO["version"]
- if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
- id_str = str(client.client_id())
- else:
- id_str = f"{random.randrange(2 ** 32):08x}"
- key = f"__REDIS-PY-{id_str}__"
+ if key is None:
+ # generate key
+ redis_version = REDIS_INFO["version"]
+ if LooseVersion(redis_version) >= LooseVersion("5.0.0"):
+ id_str = str(client.client_id())
+ else:
+ id_str = f"{random.randrange(2 ** 32):08x}"
+ key = f"__REDIS-PY-{id_str}__"
client.get(key)
while True:
monitor_response = monitor.next_command()
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 4087d33..15d8ac6 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -36,6 +36,7 @@ from .conftest import (
skip_if_redis_enterprise,
skip_if_server_version_lt,
skip_unless_arch_bits,
+ wait_for_command,
)
default_host = "127.0.0.1"
@@ -1774,7 +1775,7 @@ class TestClusterRedisCommands:
assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c")
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_log(self, r, request):
key = "{cache}:"
node = r.get_node_from_key(key)
@@ -2631,3 +2632,54 @@ class TestReadOnlyPipeline:
if executed_on_replica:
break
assert executed_on_replica is True
+
+
+@pytest.mark.onlycluster
+class TestClusterMonitor:
+ def test_wait_command_not_found(self, r):
+ "Make sure the wait_for_command func works when command is not found"
+ key = "foo"
+ node = r.get_node_from_key(key)
+ with r.monitor(target_node=node) as m:
+ response = wait_for_command(r, m, "nothing", key=key)
+ assert response is None
+
+ def test_response_values(self, r):
+ db = 0
+ key = "foo"
+ node = r.get_node_from_key(key)
+ with r.monitor(target_node=node) as m:
+ r.ping(target_nodes=node)
+ response = wait_for_command(r, m, "PING", key=key)
+ assert isinstance(response["time"], float)
+ assert response["db"] == db
+ assert response["client_type"] in ("tcp", "unix")
+ assert isinstance(response["client_address"], str)
+ assert isinstance(response["client_port"], str)
+ assert response["command"] == "PING"
+
+ def test_command_with_quoted_key(self, r):
+ key = "{foo}1"
+ node = r.get_node_from_key(key)
+ with r.monitor(node) as m:
+ r.get('{foo}"bar')
+ response = wait_for_command(r, m, 'GET {foo}"bar', key=key)
+ assert response["command"] == 'GET {foo}"bar'
+
+ def test_command_with_binary_data(self, r):
+ key = "{foo}1"
+ node = r.get_node_from_key(key)
+ with r.monitor(target_node=node) as m:
+ byte_string = b"{foo}bar\x92"
+ r.get(byte_string)
+ response = wait_for_command(r, m, "GET {foo}bar\\x92", key=key)
+ assert response["command"] == "GET {foo}bar\\x92"
+
+ def test_command_with_escaped_data(self, r):
+ key = "{foo}1"
+ node = r.get_node_from_key(key)
+ with r.monitor(target_node=node) as m:
+ byte_string = b"{foo}bar\\x92"
+ r.get(byte_string)
+ response = wait_for_command(r, m, "GET {foo}bar\\\\x92", key=key)
+ assert response["command"] == "GET {foo}bar\\\\x92"
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 556df84..936cbe5 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -84,7 +84,7 @@ class TestRedisCommands:
assert "get" in commands
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_deluser(self, r, request):
username = "redis-py-user"
@@ -109,7 +109,7 @@ class TestRedisCommands:
assert r.acl_getuser(users[4]) is None
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_genpass(self, r):
password = r.acl_genpass()
assert isinstance(password, str)
@@ -123,7 +123,7 @@ class TestRedisCommands:
assert isinstance(password, str)
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_getuser_setuser(self, r, request):
username = "redis-py-user"
@@ -236,7 +236,7 @@ class TestRedisCommands:
assert len(res) != 0
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_list(self, r, request):
username = "redis-py-user"
@@ -250,7 +250,8 @@ class TestRedisCommands:
assert len(users) == 2
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
+ @pytest.mark.onlynoncluster
def test_acl_log(self, r, request):
username = "redis-py-user"
@@ -292,7 +293,7 @@ class TestRedisCommands:
assert r.acl_log_reset()
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
username = "redis-py-user"
@@ -305,7 +306,7 @@ class TestRedisCommands:
r.acl_setuser(username, categories=["list"])
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_setuser_commands_without_prefix_fails(self, r, request):
username = "redis-py-user"
@@ -318,7 +319,7 @@ class TestRedisCommands:
r.acl_setuser(username, commands=["get"])
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request):
username = "redis-py-user"
@@ -363,7 +364,7 @@ class TestRedisCommands:
clients = r.client_list(_type=client_type)
assert isinstance(clients, list)
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_client_list_replica(self, r):
clients = r.client_list(_type="replica")
assert isinstance(clients, list)
@@ -529,7 +530,7 @@ class TestRedisCommands:
assert r.client_kill_filter(laddr=client_2_addr)
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_client_kill_filter_by_user(self, r, request):
killuser = "user_to_kill"
r.acl_setuser(
@@ -549,7 +550,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.9.50")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_client_pause(self, r):
assert r.client_pause(1)
assert r.client_pause(timeout=1)
@@ -558,7 +559,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.2.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_client_unpause(self, r):
assert r.client_unpause() == b"OK"
@@ -578,7 +579,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_client_getredir(self, r):
assert isinstance(r.client_getredir(), int)
assert r.client_getredir() == -1
@@ -590,7 +591,7 @@ class TestRedisCommands:
# assert data['maxmemory'].isdigit()
@pytest.mark.onlynoncluster
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_config_resetstat(self, r):
r.ping()
prior_commands_processed = int(r.info()["total_commands_processed"])
@@ -599,7 +600,7 @@ class TestRedisCommands:
reset_commands_processed = int(r.info()["total_commands_processed"])
assert reset_commands_processed < prior_commands_processed
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_config_set(self, r):
r.config_set("timeout", 70)
assert r.config_get()["timeout"] == "70"
@@ -626,7 +627,7 @@ class TestRedisCommands:
assert "redis_version" in info.keys()
@pytest.mark.onlynoncluster
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_lastsave(self, r):
assert isinstance(r.lastsave(), datetime.datetime)
@@ -731,7 +732,7 @@ class TestRedisCommands:
assert isinstance(t[0], int)
assert isinstance(t[1], int)
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_bgsave(self, r):
assert r.bgsave()
time.sleep(0.3)
@@ -1312,7 +1313,7 @@ class TestRedisCommands:
value2 = "mynewtext"
res = "mytext"
- if skip_if_redis_enterprise(None).args[0] is True:
+ if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.stralgo("LCS", value1, value2) == res
return
@@ -1354,7 +1355,7 @@ class TestRedisCommands:
def test_substr(self, r):
r["a"] = "0123456789"
- if skip_if_redis_enterprise(None).args[0] is True:
+ if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.substr("a", 0) == b"0123456789"
return
@@ -2665,7 +2666,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("3.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_readwrite(self, r):
assert r.readwrite()
@@ -4016,7 +4017,7 @@ class TestRedisCommands:
@skip_if_server_version_lt("4.0.0")
def test_memory_malloc_stats(self, r):
- if skip_if_redis_enterprise(None).args[0] is True:
+ if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
assert r.memory_malloc_stats()
return
@@ -4029,7 +4030,7 @@ class TestRedisCommands:
# has data
r.set("foo", "bar")
- if skip_if_redis_enterprise(None).args[0] is True:
+ if skip_if_redis_enterprise().args[0] is True:
with pytest.raises(redis.exceptions.ResponseError):
stats = r.memory_stats()
return
@@ -4047,7 +4048,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("4.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_module_list(self, r):
assert isinstance(r.module_list(), list)
for x in r.module_list():
@@ -4088,7 +4089,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("4.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_module(self, r):
with pytest.raises(redis.exceptions.ModuleError) as excinfo:
r.module_load("/some/fake/path")
@@ -4144,7 +4145,7 @@ class TestRedisCommands:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("5.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_replicaof(self, r):
with pytest.raises(redis.ResponseError):
assert r.replicaof("NO ONE")
@@ -4226,7 +4227,7 @@ class TestBinarySave:
assert "6" in parsed["allocation_stats"]
assert ">=256" in parsed["allocation_stats"]
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_large_responses(self, r):
"The PythonParser has some special cases for return values > 1MB"
# load up 5MB of data into a key
diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py
index 138fcad..3e1fbae 100644
--- a/tests/test_connection_pool.py
+++ b/tests/test_connection_pool.py
@@ -514,7 +514,7 @@ class TestConnection:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.8.8")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_busy_loading_disconnects_socket(self, r):
"""
If Redis raises a LOADING error, the connection should be
@@ -526,7 +526,7 @@ class TestConnection:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.8.8")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_busy_loading_from_pipeline_immediate_command(self, r):
"""
BusyLoadingErrors should raise from Pipelines that execute a
@@ -542,7 +542,7 @@ class TestConnection:
@pytest.mark.onlynoncluster
@skip_if_server_version_lt("2.8.8")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_busy_loading_from_pipeline(self, r):
"""
BusyLoadingErrors should be raised from a pipeline execution
@@ -558,7 +558,7 @@ class TestConnection:
assert not pool._available_connections[0]._sock
@skip_if_server_version_lt("2.8.8")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_read_only_error(self, r):
"READONLY errors get turned in ReadOnlyError exceptions"
with pytest.raises(redis.ReadOnlyError):
@@ -584,7 +584,7 @@ class TestConnection:
"path=/path/to/socket,db=0",
)
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_connect_no_auth_supplied_when_required(self, r):
"""
AuthenticationError should be raised when the server requires a
@@ -595,7 +595,7 @@ class TestConnection:
"DEBUG", "ERROR", "ERR Client sent AUTH, but no password is set"
)
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_connect_invalid_password_supplied(self, r):
"AuthenticationError should be raised when sending the wrong password"
with pytest.raises(redis.AuthenticationError):
diff --git a/tests/test_monitor.py b/tests/test_monitor.py
index 40d9e43..9b07c80 100644
--- a/tests/test_monitor.py
+++ b/tests/test_monitor.py
@@ -47,7 +47,7 @@ class TestMonitor:
response = wait_for_command(r, m, "GET foo\\\\x92")
assert response["command"] == "GET foo\\\\x92"
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_lua_script(self, r):
with r.monitor() as m:
script = 'return redis.call("GET", "foo")'
@@ -58,7 +58,7 @@ class TestMonitor:
assert response["client_address"] == "lua"
assert response["client_port"] == ""
- @skip_ifnot_redis_enterprise
+ @skip_ifnot_redis_enterprise()
def test_lua_script_in_enterprise(self, r):
with r.monitor() as m:
script = 'return redis.call("GET", "foo")'
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index 6df0faf..20ae0a0 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -530,7 +530,7 @@ class TestPubSubPings:
@pytest.mark.onlynoncluster
class TestPubSubConnectionKilled:
@skip_if_server_version_lt("3.0.0")
- @skip_if_redis_enterprise
+ @skip_if_redis_enterprise()
def test_connection_error_raised_when_connection_dies(self, r):
p = r.pubsub()
p.subscribe("foo")