diff options
Diffstat (limited to 'tests/test_asyncio/test_cluster.py')
-rw-r--r-- | tests/test_asyncio/test_cluster.py | 2232 |
1 files changed, 2232 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py new file mode 100644 index 0000000..6543e28 --- /dev/null +++ b/tests/test_asyncio/test_cluster.py @@ -0,0 +1,2232 @@ +import asyncio +import binascii +import datetime +import sys +import warnings + +import pytest + +from .compat import mock + +if sys.version_info[0:2] == (3, 6): + import pytest as pytest_asyncio +else: + import pytest_asyncio + +from typing import Callable, Dict, List, Optional, Type, Union + +from _pytest.fixtures import FixtureRequest, SubRequest + +from redis.asyncio import Connection, RedisCluster +from redis.asyncio.cluster import ( + PRIMARY, + REDIS_CLUSTER_HASH_SLOTS, + REPLICA, + ClusterNode, + NodesManager, + get_node_name, +) +from redis.asyncio.parser import CommandsParser +from redis.crc import key_slot +from redis.exceptions import ( + AskError, + ClusterDownError, + ConnectionError, + DataError, + MovedError, + NoPermissionError, + RedisClusterException, + RedisError, + ResponseError, +) +from redis.utils import str_if_bytes +from tests.conftest import ( + skip_if_redis_enterprise, + skip_if_server_version_lt, + skip_unless_arch_bits, +) + +pytestmark = pytest.mark.asyncio + +default_host = "127.0.0.1" +default_port = 7000 +default_cluster_slots = [ + [0, 8191, ["127.0.0.1", 7000, "node_0"], ["127.0.0.1", 7003, "node_3"]], + [8192, 16383, ["127.0.0.1", 7001, "node_1"], ["127.0.0.1", 7002, "node_2"]], +] + + +@pytest_asyncio.fixture() +async def slowlog(request: SubRequest, r: RedisCluster) -> None: + """ + Set the slowlog threshold to 0, and the + max length to 128. This will force every + command into the slowlog and allow us + to test it + """ + # Save old values + current_config = await r.config_get(target_nodes=r.get_primaries()[0]) + old_slower_than_value = current_config["slowlog-log-slower-than"] + old_max_length_value = current_config["slowlog-max-len"] + + # Set the new values + await r.config_set("slowlog-log-slower-than", 0) + await r.config_set("slowlog-max-len", 128) + + yield + + await r.config_set("slowlog-log-slower-than", old_slower_than_value) + await r.config_set("slowlog-max-len", old_max_length_value) + + +async def get_mocked_redis_client(*args, **kwargs) -> RedisCluster: + """ + Return a stable RedisCluster object that have deterministic + nodes and slots setup to remove the problem of different IP addresses + on different installations and machines. + """ + cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots) + coverage_res = kwargs.pop("coverage_result", "yes") + cluster_enabled = kwargs.pop("cluster_enabled", True) + with mock.patch.object(ClusterNode, "execute_command") as execute_command_mock: + + async def execute_command(*_args, **_kwargs): + if _args[0] == "CLUSTER SLOTS": + mock_cluster_slots = cluster_slots + return mock_cluster_slots + elif _args[0] == "COMMAND": + return {"get": [], "set": []} + elif _args[0] == "INFO": + return {"cluster_enabled": cluster_enabled} + elif len(_args) > 1 and _args[1] == "cluster-require-full-coverage": + return {"cluster-require-full-coverage": coverage_res} + else: + return await execute_command_mock(*_args, **_kwargs) + + execute_command_mock.side_effect = execute_command + + with mock.patch.object( + CommandsParser, "initialize", autospec=True + ) as cmd_parser_initialize: + + def cmd_init_mock(self, r): + self.commands = { + "GET": { + "name": "get", + "arity": 2, + "flags": ["readonly", "fast"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + } + } + + cmd_parser_initialize.side_effect = cmd_init_mock + + return await RedisCluster(*args, **kwargs) + + +def mock_node_resp( + node: ClusterNode, + response: Union[ + List[List[Union[int, List[Union[str, int]]]]], List[bytes], str, int + ], +) -> ClusterNode: + connection = mock.AsyncMock() + connection.is_connected = True + connection.read_response_without_lock.return_value = response + while node._free: + node._free.pop() + node._free.append(connection) + return node + + +def mock_all_nodes_resp( + rc: RedisCluster, + response: Union[ + List[List[Union[int, List[Union[str, int]]]]], List[bytes], int, str + ], +) -> RedisCluster: + for node in rc.get_nodes(): + mock_node_resp(node, response) + return rc + + +async def moved_redirection_helper( + request: FixtureRequest, create_redis: Callable, failover: bool = False +) -> None: + """ + Test that the client handles MOVED response after a failover. + Redirection after a failover means that the redirection address is of a + replica that was promoted to a primary. + + At first call it should return a MOVED ResponseError that will point + the client to the next server it should talk to. + + Verify that: + 1. it tries to talk to the redirected node + 2. it updates the slot's primary to the redirected node + + For a failover, also verify: + 3. the redirected node's server type updated to 'primary' + 4. the server type of the previous slot owner updated to 'replica' + """ + rc = await create_redis(cls=RedisCluster, flushdb=False) + slot = 12182 + redirect_node = None + # Get the current primary that holds this slot + prev_primary = rc.nodes_manager.get_node_from_slot(slot) + if failover: + if len(rc.nodes_manager.slots_cache[slot]) < 2: + warnings.warn("Skipping this test since it requires to have a " "replica") + return + redirect_node = rc.nodes_manager.slots_cache[slot][1] + else: + # Use one of the primaries to be the redirected node + redirect_node = rc.get_primaries()[0] + r_host = redirect_node.host + r_port = redirect_node.port + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + + def moved_redirect_effect(self, *args, **options): + def ok_response(self, *args, **options): + assert self.host == r_host + assert self.port == r_port + + return "MOCK_OK" + + execute_command.side_effect = ok_response + raise MovedError(f"{slot} {r_host}:{r_port}") + + execute_command.side_effect = moved_redirect_effect + assert await rc.execute_command("SET", "foo", "bar") == "MOCK_OK" + slot_primary = rc.nodes_manager.slots_cache[slot][0] + assert slot_primary == redirect_node + if failover: + assert rc.get_node(host=r_host, port=r_port).server_type == PRIMARY + assert prev_primary.server_type == REPLICA + + +@pytest.mark.onlycluster +class TestRedisClusterObj: + """ + Tests for the RedisCluster class + """ + + async def test_host_port_startup_node(self) -> None: + """ + Test that it is possible to use host & port arguments as startup node + args + """ + cluster = await get_mocked_redis_client(host=default_host, port=default_port) + assert cluster.get_node(host=default_host, port=default_port) is not None + + await cluster.close() + + async def test_startup_nodes(self) -> None: + """ + Test that it is possible to use startup_nodes + argument to init the cluster + """ + port_1 = 7000 + port_2 = 7001 + startup_nodes = [ + ClusterNode(default_host, port_1), + ClusterNode(default_host, port_2), + ] + cluster = await get_mocked_redis_client(startup_nodes=startup_nodes) + assert ( + cluster.get_node(host=default_host, port=port_1) is not None + and cluster.get_node(host=default_host, port=port_2) is not None + ) + + await cluster.close() + + async def test_empty_startup_nodes(self) -> None: + """ + Test that exception is raised when empty providing empty startup_nodes + """ + with pytest.raises(RedisClusterException) as ex: + RedisCluster(startup_nodes=[]) + + assert str(ex.value).startswith( + "RedisCluster requires at least one node to discover the " "cluster" + ), str_if_bytes(ex.value) + + async def test_from_url(self, r: RedisCluster) -> None: + redis_url = f"redis://{default_host}:{default_port}/0" + with mock.patch.object(RedisCluster, "from_url") as from_url: + + async def from_url_mocked(_url, **_kwargs): + return await get_mocked_redis_client(url=_url, **_kwargs) + + from_url.side_effect = from_url_mocked + cluster = await RedisCluster.from_url(redis_url) + assert cluster.get_node(host=default_host, port=default_port) is not None + + await cluster.close() + + async def test_execute_command_errors(self, r: RedisCluster) -> None: + """ + Test that if no key is provided then exception should be raised. + """ + with pytest.raises(RedisClusterException) as ex: + await r.execute_command("GET") + assert str(ex.value).startswith( + "No way to dispatch this command to " "Redis Cluster. Missing key." + ) + + async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> None: + """ + Test command execution with nodes flag PRIMARIES + """ + primaries = r.get_primaries() + replicas = r.get_replicas() + mock_all_nodes_resp(r, "PONG") + assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True + for primary in primaries: + conn = primary._free.pop() + assert conn.read_response_without_lock.called is True + for replica in replicas: + conn = replica._free.pop() + assert conn.read_response_without_lock.called is not True + + async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None: + """ + Test command execution with nodes flag REPLICAS + """ + replicas = r.get_replicas() + if not replicas: + r = await get_mocked_redis_client(default_host, default_port) + primaries = r.get_primaries() + mock_all_nodes_resp(r, "PONG") + assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True + for replica in replicas: + conn = replica._free.pop() + assert conn.read_response_without_lock.called is True + for primary in primaries: + conn = primary._free.pop() + assert conn.read_response_without_lock.called is not True + + await r.close() + + async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> None: + """ + Test command execution with nodes flag ALL_NODES + """ + mock_all_nodes_resp(r, "PONG") + assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True + for node in r.get_nodes(): + conn = node._free.pop() + assert conn.read_response_without_lock.called is True + + async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None: + """ + Test command execution with nodes flag RANDOM + """ + mock_all_nodes_resp(r, "PONG") + assert await r.ping(target_nodes=RedisCluster.RANDOM) is True + called_count = 0 + for node in r.get_nodes(): + conn = node._free.pop() + if conn.read_response_without_lock.called is True: + called_count += 1 + assert called_count == 1 + + async def test_execute_command_default_node(self, r: RedisCluster) -> None: + """ + Test command execution without node flag is being executed on the + default node + """ + def_node = r.get_default_node() + mock_node_resp(def_node, "PONG") + assert await r.ping() is True + conn = def_node._free.pop() + assert conn.read_response_without_lock.called + + async def test_ask_redirection(self, r: RedisCluster) -> None: + """ + Test that the server handles ASK response. + + At first call it should return a ASK ResponseError that will point + the client to the next server it should talk to. + + Important thing to verify is that it tries to talk to the second node. + """ + redirect_node = r.get_nodes()[0] + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + + def ask_redirect_effect(self, *args, **options): + def ok_response(self, *args, **options): + assert self.host == redirect_node.host + assert self.port == redirect_node.port + + return "MOCK_OK" + + execute_command.side_effect = ok_response + raise AskError(f"12182 {redirect_node.host}:{redirect_node.port}") + + execute_command.side_effect = ask_redirect_effect + + assert await r.execute_command("SET", "foo", "bar") == "MOCK_OK" + + async def test_moved_redirection( + self, request: FixtureRequest, create_redis: Callable + ) -> None: + """ + Test that the client handles MOVED response. + """ + await moved_redirection_helper(request, create_redis, failover=False) + + async def test_moved_redirection_after_failover( + self, request: FixtureRequest, create_redis: Callable + ) -> None: + """ + Test that the client handles MOVED response after a failover. + """ + await moved_redirection_helper(request, create_redis, failover=True) + + async def test_refresh_using_specific_nodes( + self, request: FixtureRequest, create_redis: Callable + ) -> None: + """ + Test making calls on specific nodes when the cluster has failed over to + another node + """ + node_7006 = ClusterNode(host=default_host, port=7006, server_type=PRIMARY) + node_7007 = ClusterNode(host=default_host, port=7007, server_type=PRIMARY) + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + with mock.patch.object( + NodesManager, "initialize", autospec=True + ) as initialize: + with mock.patch.multiple( + Connection, + send_packed_command=mock.DEFAULT, + connect=mock.DEFAULT, + can_read=mock.DEFAULT, + ) as mocks: + # simulate 7006 as a failed node + def execute_command_mock(self, *args, **options): + if self.port == 7006: + execute_command.failed_calls += 1 + raise ClusterDownError( + "CLUSTERDOWN The cluster is " + "down. Use CLUSTER INFO for " + "more information" + ) + elif self.port == 7007: + execute_command.successful_calls += 1 + + def initialize_mock(self): + # start with all slots mapped to 7006 + self.nodes_cache = {node_7006.name: node_7006} + self.default_node = node_7006 + self.slots_cache = {} + + for i in range(0, 16383): + self.slots_cache[i] = [node_7006] + + # After the first connection fails, a reinitialize + # should follow the cluster to 7007 + def map_7007(self): + self.nodes_cache = {node_7007.name: node_7007} + self.default_node = node_7007 + self.slots_cache = {} + + for i in range(0, 16383): + self.slots_cache[i] = [node_7007] + + # Change initialize side effect for the second call + initialize.side_effect = map_7007 + + execute_command.side_effect = execute_command_mock + execute_command.successful_calls = 0 + execute_command.failed_calls = 0 + initialize.side_effect = initialize_mock + mocks["can_read"].return_value = False + mocks["send_packed_command"].return_value = "MOCK_OK" + mocks["connect"].return_value = None + with mock.patch.object( + CommandsParser, "initialize", autospec=True + ) as cmd_parser_initialize: + + def cmd_init_mock(self, r): + self.commands = { + "GET": { + "name": "get", + "arity": 2, + "flags": ["readonly", "fast"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + } + } + + cmd_parser_initialize.side_effect = cmd_init_mock + + rc = await create_redis(cls=RedisCluster, flushdb=False) + assert len(rc.get_nodes()) == 1 + assert rc.get_node(node_name=node_7006.name) is not None + + await rc.get("foo") + + # Cluster should now point to 7007, and there should be + # one failed and one successful call + assert len(rc.get_nodes()) == 1 + assert rc.get_node(node_name=node_7007.name) is not None + assert rc.get_node(node_name=node_7006.name) is None + assert execute_command.failed_calls == 1 + assert execute_command.successful_calls == 1 + + async def test_reading_from_replicas_in_round_robin(self) -> None: + with mock.patch.multiple( + Connection, + send_command=mock.DEFAULT, + read_response_without_lock=mock.DEFAULT, + _connect=mock.DEFAULT, + can_read=mock.DEFAULT, + on_connect=mock.DEFAULT, + ) as mocks: + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + + async def execute_command_mock_first(self, *args, **options): + await self.connection_class(**self.connection_kwargs).connect() + # Primary + assert self.port == 7001 + execute_command.side_effect = execute_command_mock_second + return "MOCK_OK" + + def execute_command_mock_second(self, *args, **options): + # Replica + assert self.port == 7002 + execute_command.side_effect = execute_command_mock_third + return "MOCK_OK" + + def execute_command_mock_third(self, *args, **options): + # Primary + assert self.port == 7001 + return "MOCK_OK" + + # We don't need to create a real cluster connection but we + # do want RedisCluster.on_connect function to get called, + # so we'll mock some of the Connection's functions to allow it + execute_command.side_effect = execute_command_mock_first + mocks["send_command"].return_value = True + mocks["read_response_without_lock"].return_value = "OK" + mocks["_connect"].return_value = True + mocks["can_read"].return_value = False + mocks["on_connect"].return_value = True + + # Create a cluster with reading from replications + read_cluster = await get_mocked_redis_client( + host=default_host, port=default_port, read_from_replicas=True + ) + assert read_cluster.read_from_replicas is True + # Check that we read from the slot's nodes in a round robin + # matter. + # 'foo' belongs to slot 12182 and the slot's nodes are: + # [(127.0.0.1,7001,primary), (127.0.0.1,7002,replica)] + await read_cluster.get("foo") + await read_cluster.get("foo") + await read_cluster.get("foo") + mocks["send_command"].assert_has_calls([mock.call("READONLY")]) + + await read_cluster.close() + + async def test_keyslot(self, r: RedisCluster) -> None: + """ + Test that method will compute correct key in all supported cases + """ + assert r.keyslot("foo") == 12182 + assert r.keyslot("{foo}bar") == 12182 + assert r.keyslot("{foo}") == 12182 + assert r.keyslot(1337) == 4314 + + assert r.keyslot(125) == r.keyslot(b"125") + assert r.keyslot(125) == r.keyslot("\x31\x32\x35") + assert r.keyslot("大奖") == r.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96") + assert r.keyslot("大奖") == r.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96") + assert r.keyslot(1337.1234) == r.keyslot("1337.1234") + assert r.keyslot(1337) == r.keyslot("1337") + assert r.keyslot(b"abc") == r.keyslot("abc") + + async def test_get_node_name(self) -> None: + assert ( + get_node_name(default_host, default_port) + == f"{default_host}:{default_port}" + ) + + async def test_all_nodes(self, r: RedisCluster) -> None: + """ + Set a list of nodes and it should be possible to iterate over all + """ + nodes = [node for node in r.nodes_manager.nodes_cache.values()] + + for i, node in enumerate(r.get_nodes()): + assert node in nodes + + async def test_all_nodes_masters(self, r: RedisCluster) -> None: + """ + Set a list of nodes with random primaries/replicas config and it shold + be possible to iterate over all of them. + """ + nodes = [ + node + for node in r.nodes_manager.nodes_cache.values() + if node.server_type == PRIMARY + ] + + for node in r.get_primaries(): + assert node in nodes + + @pytest.mark.parametrize("error", RedisCluster.ERRORS_ALLOW_RETRY) + async def test_cluster_down_overreaches_retry_attempts( + self, + error: Union[Type[TimeoutError], Type[ClusterDownError], Type[ConnectionError]], + ) -> None: + """ + When error that allows retry is thrown, test that we retry executing + the command as many times as configured in cluster_error_retry_attempts + and then raise the exception + """ + with mock.patch.object(RedisCluster, "_execute_command") as execute_command: + + def raise_error(target_node, *args, **kwargs): + execute_command.failed_calls += 1 + raise error("mocked error") + + execute_command.side_effect = raise_error + + rc = await get_mocked_redis_client(host=default_host, port=default_port) + + with pytest.raises(error): + await rc.get("bar") + assert execute_command.failed_calls == rc.cluster_error_retry_attempts + + await rc.close() + + async def test_set_default_node_success(self, r: RedisCluster) -> None: + """ + test successful replacement of the default cluster node + """ + default_node = r.get_default_node() + # get a different node + new_def_node = None + for node in r.get_nodes(): + if node != default_node: + new_def_node = node + break + r.set_default_node(new_def_node) + assert r.get_default_node() == new_def_node + + async def test_set_default_node_failure(self, r: RedisCluster) -> None: + """ + test failed replacement of the default cluster node + """ + default_node = r.get_default_node() + new_def_node = ClusterNode("1.1.1.1", 1111) + with pytest.raises(DataError): + r.set_default_node(None) + with pytest.raises(DataError): + r.set_default_node(new_def_node) + assert r.get_default_node() == default_node + + async def test_get_node_from_key(self, r: RedisCluster) -> None: + """ + Test that get_node_from_key function returns the correct node + """ + key = "bar" + slot = r.keyslot(key) + slot_nodes = r.nodes_manager.slots_cache.get(slot) + primary = slot_nodes[0] + assert r.get_node_from_key(key, replica=False) == primary + replica = r.get_node_from_key(key, replica=True) + if replica is not None: + assert replica.server_type == REPLICA + assert replica in slot_nodes + + @skip_if_redis_enterprise() + async def test_not_require_full_coverage_cluster_down_error( + self, r: RedisCluster + ) -> None: + """ + When require_full_coverage is set to False (default client config) and not + all slots are covered, if one of the nodes has 'cluster-require_full_coverage' + config set to 'yes' some key-based commands should throw ClusterDownError + """ + node = r.get_node_from_key("foo") + missing_slot = r.keyslot("foo") + assert await r.set("foo", "bar") is True + try: + assert all(await r.cluster_delslots(missing_slot)) + with pytest.raises(ClusterDownError): + await r.exists("foo") + finally: + try: + # Add back the missing slot + assert await r.cluster_addslots(node, missing_slot) is True + # Make sure we are not getting ClusterDownError anymore + assert await r.exists("foo") == 1 + except ResponseError as e: + if f"Slot {missing_slot} is already busy" in str(e): + # It can happen if the test failed to delete this slot + pass + else: + raise e + + async def test_can_run_concurrent_commands(self, r: RedisCluster) -> None: + assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True + assert all( + await asyncio.gather( + *(r.ping(target_nodes=RedisCluster.ALL_NODES) for _ in range(100)) + ) + ) + + +@pytest.mark.onlycluster +class TestClusterRedisCommands: + """ + Tests for RedisCluster unique commands + """ + + async def test_get_and_set(self, r: RedisCluster) -> None: + # get and set can't be tested independently of each other + assert await r.get("a") is None + byte_string = b"value" + integer = 5 + unicode_string = chr(3456) + "abcd" + chr(3421) + assert await r.set("byte_string", byte_string) + assert await r.set("integer", 5) + assert await r.set("unicode_string", unicode_string) + assert await r.get("byte_string") == byte_string + assert await r.get("integer") == str(integer).encode() + assert (await r.get("unicode_string")).decode("utf-8") == unicode_string + + async def test_mget_nonatomic(self, r: RedisCluster) -> None: + assert await r.mget_nonatomic([]) == [] + assert await r.mget_nonatomic(["a", "b"]) == [None, None] + await r.set("a", "1") + await r.set("b", "2") + await r.set("c", "3") + + assert await r.mget_nonatomic("a", "other", "b", "c") == [ + b"1", + None, + b"2", + b"3", + ] + + async def test_mset_nonatomic(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + assert await r.mset_nonatomic(d) + for k, v in d.items(): + assert await r.get(k) == v + + async def test_config_set(self, r: RedisCluster) -> None: + assert await r.config_set("slowlog-log-slower-than", 0) + + async def test_cluster_config_resetstat(self, r: RedisCluster) -> None: + await r.ping(target_nodes="all") + all_info = await r.info(target_nodes="all") + prior_commands_processed = -1 + for node_info in all_info.values(): + prior_commands_processed = node_info["total_commands_processed"] + assert prior_commands_processed >= 1 + await r.config_resetstat(target_nodes="all") + all_info = await r.info(target_nodes="all") + for node_info in all_info.values(): + reset_commands_processed = node_info["total_commands_processed"] + assert reset_commands_processed < prior_commands_processed + + async def test_client_setname(self, r: RedisCluster) -> None: + node = r.get_random_node() + await r.client_setname("redis_py_test", target_nodes=node) + client_name = await r.client_getname(target_nodes=node) + assert client_name == "redis_py_test" + + async def test_exists(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + await r.mset_nonatomic(d) + assert await r.exists(*d.keys()) == len(d) + + async def test_delete(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + await r.mset_nonatomic(d) + assert await r.delete(*d.keys()) == len(d) + assert await r.delete(*d.keys()) == 0 + + async def test_touch(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + await r.mset_nonatomic(d) + assert await r.touch(*d.keys()) == len(d) + + async def test_unlink(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + await r.mset_nonatomic(d) + assert await r.unlink(*d.keys()) == len(d) + # Unlink is non-blocking so we sleep before + # verifying the deletion + await asyncio.sleep(0.1) + assert await r.unlink(*d.keys()) == 0 + + @skip_if_redis_enterprise() + async def test_cluster_myid(self, r: RedisCluster) -> None: + node = r.get_random_node() + myid = await r.cluster_myid(node) + assert len(myid) == 40 + + @skip_if_redis_enterprise() + async def test_cluster_slots(self, r: RedisCluster) -> None: + mock_all_nodes_resp(r, default_cluster_slots) + cluster_slots = await r.cluster_slots() + assert isinstance(cluster_slots, dict) + assert len(default_cluster_slots) == len(cluster_slots) + assert cluster_slots.get((0, 8191)) is not None + assert cluster_slots.get((0, 8191)).get("primary") == ("127.0.0.1", 7000) + + @skip_if_redis_enterprise() + async def test_cluster_addslots(self, r: RedisCluster) -> None: + node = r.get_random_node() + mock_node_resp(node, "OK") + assert await r.cluster_addslots(node, 1, 2, 3) is True + + @skip_if_server_version_lt("7.0.0") + @skip_if_redis_enterprise() + async def test_cluster_addslotsrange(self, r: RedisCluster): + node = r.get_random_node() + mock_node_resp(node, "OK") + assert await r.cluster_addslotsrange(node, 1, 5) + + @skip_if_redis_enterprise() + async def test_cluster_countkeysinslot(self, r: RedisCluster) -> None: + node = r.nodes_manager.get_node_from_slot(1) + mock_node_resp(node, 2) + assert await r.cluster_countkeysinslot(1) == 2 + + async def test_cluster_count_failure_report(self, r: RedisCluster) -> None: + mock_all_nodes_resp(r, 0) + assert await r.cluster_count_failure_report("node_0") == 0 + + @skip_if_redis_enterprise() + async def test_cluster_delslots(self) -> None: + cluster_slots = [ + [0, 8191, ["127.0.0.1", 7000, "node_0"]], + [8192, 16383, ["127.0.0.1", 7001, "node_1"]], + ] + r = await get_mocked_redis_client( + host=default_host, port=default_port, cluster_slots=cluster_slots + ) + mock_all_nodes_resp(r, "OK") + node0 = r.get_node(default_host, 7000) + node1 = r.get_node(default_host, 7001) + assert await r.cluster_delslots(0, 8192) == [True, True] + assert node0._free.pop().read_response_without_lock.called + assert node1._free.pop().read_response_without_lock.called + + await r.close() + + @skip_if_server_version_lt("7.0.0") + @skip_if_redis_enterprise() + async def test_cluster_delslotsrange(self, r: RedisCluster): + node = r.get_random_node() + mock_node_resp(node, "OK") + await r.cluster_addslots(node, 1, 2, 3, 4, 5) + assert await r.cluster_delslotsrange(1, 5) + + @skip_if_redis_enterprise() + async def test_cluster_failover(self, r: RedisCluster) -> None: + node = r.get_random_node() + mock_node_resp(node, "OK") + assert await r.cluster_failover(node) is True + assert await r.cluster_failover(node, "FORCE") is True + assert await r.cluster_failover(node, "TAKEOVER") is True + with pytest.raises(RedisError): + await r.cluster_failover(node, "FORCT") + + @skip_if_redis_enterprise() + async def test_cluster_info(self, r: RedisCluster) -> None: + info = await r.cluster_info() + assert isinstance(info, dict) + assert info["cluster_state"] == "ok" + + @skip_if_redis_enterprise() + async def test_cluster_keyslot(self, r: RedisCluster) -> None: + mock_all_nodes_resp(r, 12182) + assert await r.cluster_keyslot("foo") == 12182 + + @skip_if_redis_enterprise() + async def test_cluster_meet(self, r: RedisCluster) -> None: + node = r.get_default_node() + mock_node_resp(node, "OK") + assert await r.cluster_meet("127.0.0.1", 6379) is True + + @skip_if_redis_enterprise() + async def test_cluster_nodes(self, r: RedisCluster) -> None: + response = ( + "c8253bae761cb1ecb2b61857d85dfe455a0fec8b 172.17.0.7:7006 " + "slave aa90da731f673a99617dfe930306549a09f83a6b 0 " + "1447836263059 5 connected\n" + "9bd595fe4821a0e8d6b99d70faa660638a7612b3 172.17.0.7:7008 " + "master - 0 1447836264065 0 connected\n" + "aa90da731f673a99617dfe930306549a09f83a6b 172.17.0.7:7003 " + "myself,master - 0 0 2 connected 5461-10922\n" + "1df047e5a594f945d82fc140be97a1452bcbf93e 172.17.0.7:7007 " + "slave 19efe5a631f3296fdf21a5441680f893e8cc96ec 0 " + "1447836262556 3 connected\n" + "4ad9a12e63e8f0207025eeba2354bcf4c85e5b22 172.17.0.7:7005 " + "master - 0 1447836262555 7 connected 0-5460\n" + "19efe5a631f3296fdf21a5441680f893e8cc96ec 172.17.0.7:7004 " + "master - 0 1447836263562 3 connected 10923-16383\n" + "fbb23ed8cfa23f17eaf27ff7d0c410492a1093d6 172.17.0.7:7002 " + "master,fail - 1447829446956 1447829444948 1 disconnected\n" + ) + mock_all_nodes_resp(r, response) + nodes = await r.cluster_nodes() + assert len(nodes) == 7 + assert nodes.get("172.17.0.7:7006") is not None + assert ( + nodes.get("172.17.0.7:7006").get("node_id") + == "c8253bae761cb1ecb2b61857d85dfe455a0fec8b" + ) + + @skip_if_redis_enterprise() + async def test_cluster_nodes_importing_migrating(self, r: RedisCluster) -> None: + response = ( + "488ead2fcce24d8c0f158f9172cb1f4a9e040fe5 127.0.0.1:16381@26381 " + "master - 0 1648975557664 3 connected 10923-16383\n" + "8ae2e70812db80776f739a72374e57fc4ae6f89d 127.0.0.1:16380@26380 " + "master - 0 1648975555000 2 connected 1 5461-10922 [" + "2-<-ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16]\n" + "ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16 127.0.0.1:16379@26379 " + "myself,master - 0 1648975556000 1 connected 0 2-5460 [" + "2->-8ae2e70812db80776f739a72374e57fc4ae6f89d]\n" + ) + mock_all_nodes_resp(r, response) + nodes = await r.cluster_nodes() + assert len(nodes) == 3 + node_16379 = nodes.get("127.0.0.1:16379") + node_16380 = nodes.get("127.0.0.1:16380") + node_16381 = nodes.get("127.0.0.1:16381") + assert node_16379.get("migrations") == [ + { + "slot": "2", + "node_id": "8ae2e70812db80776f739a72374e57fc4ae6f89d", + "state": "migrating", + } + ] + assert node_16379.get("slots") == [["0"], ["2", "5460"]] + assert node_16380.get("migrations") == [ + { + "slot": "2", + "node_id": "ed8007ccfa2d91a7b76f8e6fba7ba7e257034a16", + "state": "importing", + } + ] + assert node_16380.get("slots") == [["1"], ["5461", "10922"]] + assert node_16381.get("slots") == [["10923", "16383"]] + assert node_16381.get("migrations") == [] + + @skip_if_redis_enterprise() + async def test_cluster_replicate(self, r: RedisCluster) -> None: + node = r.get_random_node() + all_replicas = r.get_replicas() + mock_all_nodes_resp(r, "OK") + assert await r.cluster_replicate(node, "c8253bae761cb61857d") is True + results = await r.cluster_replicate(all_replicas, "c8253bae761cb61857d") + if isinstance(results, dict): + for res in results.values(): + assert res is True + else: + assert results is True + + @skip_if_redis_enterprise() + async def test_cluster_reset(self, r: RedisCluster) -> None: + mock_all_nodes_resp(r, "OK") + assert await r.cluster_reset() is True + assert await r.cluster_reset(False) is True + all_results = await r.cluster_reset(False, target_nodes="all") + for res in all_results.values(): + assert res is True + + @skip_if_redis_enterprise() + async def test_cluster_save_config(self, r: RedisCluster) -> None: + node = r.get_random_node() + all_nodes = r.get_nodes() + mock_all_nodes_resp(r, "OK") + assert await r.cluster_save_config(node) is True + all_results = await r.cluster_save_config(all_nodes) + for res in all_results.values(): + assert res is True + + @skip_if_redis_enterprise() + async def test_cluster_get_keys_in_slot(self, r: RedisCluster) -> None: + response = ["{foo}1", "{foo}2"] + node = r.nodes_manager.get_node_from_slot(12182) + mock_node_resp(node, response) + keys = await r.cluster_get_keys_in_slot(12182, 4) + assert keys == response + + @skip_if_redis_enterprise() + async def test_cluster_set_config_epoch(self, r: RedisCluster) -> None: + mock_all_nodes_resp(r, "OK") + assert await r.cluster_set_config_epoch(3) is True + all_results = await r.cluster_set_config_epoch(3, target_nodes="all") + for res in all_results.values(): + assert res is True + + @skip_if_redis_enterprise() + async def test_cluster_setslot(self, r: RedisCluster) -> None: + node = r.get_random_node() + mock_node_resp(node, "OK") + assert await r.cluster_setslot(node, "node_0", 1218, "IMPORTING") is True + assert await r.cluster_setslot(node, "node_0", 1218, "NODE") is True + assert await r.cluster_setslot(node, "node_0", 1218, "MIGRATING") is True + with pytest.raises(RedisError): + await r.cluster_failover(node, "STABLE") + with pytest.raises(RedisError): + await r.cluster_failover(node, "STATE") + + async def test_cluster_setslot_stable(self, r: RedisCluster) -> None: + node = r.nodes_manager.get_node_from_slot(12182) + mock_node_resp(node, "OK") + assert await r.cluster_setslot_stable(12182) is True + assert node._free.pop().read_response_without_lock.called + + @skip_if_redis_enterprise() + async def test_cluster_replicas(self, r: RedisCluster) -> None: + response = [ + b"01eca22229cf3c652b6fca0d09ff6941e0d2e3 " + b"127.0.0.1:6377@16377 slave " + b"52611e796814b78e90ad94be9d769a4f668f9a 0 " + b"1634550063436 4 connected", + b"r4xfga22229cf3c652b6fca0d09ff69f3e0d4d " + b"127.0.0.1:6378@16378 slave " + b"52611e796814b78e90ad94be9d769a4f668f9a 0 " + b"1634550063436 4 connected", + ] + mock_all_nodes_resp(r, response) + replicas = await r.cluster_replicas("52611e796814b78e90ad94be9d769a4f668f9a") + assert replicas.get("127.0.0.1:6377") is not None + assert replicas.get("127.0.0.1:6378") is not None + assert ( + replicas.get("127.0.0.1:6378").get("node_id") + == "r4xfga22229cf3c652b6fca0d09ff69f3e0d4d" + ) + + @skip_if_server_version_lt("7.0.0") + async def test_cluster_links(self, r: RedisCluster): + node = r.get_random_node() + res = await r.cluster_links(node) + links_to = sum(x.count("to") for x in res) + links_for = sum(x.count("from") for x in res) + assert links_to == links_for + for i in range(0, len(res) - 1, 2): + assert res[i][3] == res[i + 1][3] + + @skip_if_redis_enterprise() + async def test_readonly(self) -> None: + r = await get_mocked_redis_client(host=default_host, port=default_port) + mock_all_nodes_resp(r, "OK") + assert await r.readonly() is True + all_replicas_results = await r.readonly(target_nodes="replicas") + for res in all_replicas_results.values(): + assert res is True + for replica in r.get_replicas(): + assert replica._free.pop().read_response_without_lock.called + + await r.close() + + @skip_if_redis_enterprise() + async def test_readwrite(self) -> None: + r = await get_mocked_redis_client(host=default_host, port=default_port) + mock_all_nodes_resp(r, "OK") + assert await r.readwrite() is True + all_replicas_results = await r.readwrite(target_nodes="replicas") + for res in all_replicas_results.values(): + assert res is True + for replica in r.get_replicas(): + assert replica._free.pop().read_response_without_lock.called + + await r.close() + + @skip_if_redis_enterprise() + async def test_bgsave(self, r: RedisCluster) -> None: + assert await r.bgsave() + await asyncio.sleep(0.3) + assert await r.bgsave(True) + + async def test_info(self, r: RedisCluster) -> None: + # Map keys to same slot + await r.set("x{1}", 1) + await r.set("y{1}", 2) + await r.set("z{1}", 3) + # Get node that handles the slot + slot = r.keyslot("x{1}") + node = r.nodes_manager.get_node_from_slot(slot) + # Run info on that node + info = await r.info(target_nodes=node) + assert isinstance(info, dict) + assert info["db0"]["keys"] == 3 + + async def _init_slowlog_test(self, r: RedisCluster, node: ClusterNode) -> str: + slowlog_lim = await r.config_get("slowlog-log-slower-than", target_nodes=node) + assert ( + await r.config_set("slowlog-log-slower-than", 0, target_nodes=node) is True + ) + return slowlog_lim["slowlog-log-slower-than"] + + async def _teardown_slowlog_test( + self, r: RedisCluster, node: ClusterNode, prev_limit: str + ) -> None: + assert ( + await r.config_set("slowlog-log-slower-than", prev_limit, target_nodes=node) + is True + ) + + async def test_slowlog_get( + self, r: RedisCluster, slowlog: Optional[List[Dict[str, Union[int, bytes]]]] + ) -> None: + unicode_string = chr(3456) + "abcd" + chr(3421) + node = r.get_node_from_key(unicode_string) + slowlog_limit = await self._init_slowlog_test(r, node) + assert await r.slowlog_reset(target_nodes=node) + await r.get(unicode_string) + slowlog = await r.slowlog_get(target_nodes=node) + assert isinstance(slowlog, list) + commands = [log["command"] for log in slowlog] + + get_command = b" ".join((b"GET", unicode_string.encode("utf-8"))) + assert get_command in commands + assert b"SLOWLOG RESET" in commands + + # the order should be ['GET <uni string>', 'SLOWLOG RESET'], + # but if other clients are executing commands at the same time, there + # could be commands, before, between, or after, so just check that + # the two we care about are in the appropriate order. + assert commands.index(get_command) < commands.index(b"SLOWLOG RESET") + + # make sure other attributes are typed correctly + assert isinstance(slowlog[0]["start_time"], int) + assert isinstance(slowlog[0]["duration"], int) + # rollback the slowlog limit to its original value + await self._teardown_slowlog_test(r, node, slowlog_limit) + + async def test_slowlog_get_limit( + self, r: RedisCluster, slowlog: Optional[List[Dict[str, Union[int, bytes]]]] + ) -> None: + assert await r.slowlog_reset() + node = r.get_node_from_key("foo") + slowlog_limit = await self._init_slowlog_test(r, node) + await r.get("foo") + slowlog = await r.slowlog_get(1, target_nodes=node) + assert isinstance(slowlog, list) + # only one command, based on the number we passed to slowlog_get() + assert len(slowlog) == 1 + await self._teardown_slowlog_test(r, node, slowlog_limit) + + async def test_slowlog_length(self, r: RedisCluster, slowlog: None) -> None: + await r.get("foo") + node = r.nodes_manager.get_node_from_slot(key_slot(b"foo")) + slowlog_len = await r.slowlog_len(target_nodes=node) + assert isinstance(slowlog_len, int) + + async def test_time(self, r: RedisCluster) -> None: + t = await r.time(target_nodes=r.get_primaries()[0]) + assert len(t) == 2 + assert isinstance(t[0], int) + assert isinstance(t[1], int) + + @skip_if_server_version_lt("4.0.0") + async def test_memory_usage(self, r: RedisCluster) -> None: + await r.set("foo", "bar") + assert isinstance(await r.memory_usage("foo"), int) + + @skip_if_server_version_lt("4.0.0") + @skip_if_redis_enterprise() + async def test_memory_malloc_stats(self, r: RedisCluster) -> None: + assert await r.memory_malloc_stats() + + @skip_if_server_version_lt("4.0.0") + @skip_if_redis_enterprise() + async def test_memory_stats(self, r: RedisCluster) -> None: + # put a key into the current db to make sure that "db.<current-db>" + # has data + await r.set("foo", "bar") + node = r.nodes_manager.get_node_from_slot(key_slot(b"foo")) + stats = await r.memory_stats(target_nodes=node) + assert isinstance(stats, dict) + for key, value in stats.items(): + if key.startswith("db."): + assert isinstance(value, dict) + + @skip_if_server_version_lt("4.0.0") + async def test_memory_help(self, r: RedisCluster) -> None: + with pytest.raises(NotImplementedError): + await r.memory_help() + + @skip_if_server_version_lt("4.0.0") + async def test_memory_doctor(self, r: RedisCluster) -> None: + with pytest.raises(NotImplementedError): + await r.memory_doctor() + + @skip_if_redis_enterprise() + async def test_lastsave(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + assert isinstance(await r.lastsave(target_nodes=node), datetime.datetime) + + async def test_cluster_echo(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + assert await r.echo("foo bar", target_nodes=node) == b"foo bar" + + @skip_if_server_version_lt("1.0.0") + async def test_debug_segfault(self, r: RedisCluster) -> None: + with pytest.raises(NotImplementedError): + await r.debug_segfault() + + async def test_config_resetstat(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + await r.ping(target_nodes=node) + prior_commands_processed = int( + (await r.info(target_nodes=node))["total_commands_processed"] + ) + assert prior_commands_processed >= 1 + await r.config_resetstat(target_nodes=node) + reset_commands_processed = int( + (await r.info(target_nodes=node))["total_commands_processed"] + ) + assert reset_commands_processed < prior_commands_processed + + @skip_if_server_version_lt("6.2.0") + async def test_client_trackinginfo(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + res = await r.client_trackinginfo(target_nodes=node) + assert len(res) > 2 + assert "prefixes" in res + + @skip_if_server_version_lt("2.9.50") + async def test_client_pause(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + assert await r.client_pause(1, target_nodes=node) + assert await r.client_pause(timeout=1, target_nodes=node) + with pytest.raises(RedisError): + await r.client_pause(timeout="not an integer", target_nodes=node) + + @skip_if_server_version_lt("6.2.0") + @skip_if_redis_enterprise() + async def test_client_unpause(self, r: RedisCluster) -> None: + assert await r.client_unpause() + + @skip_if_server_version_lt("5.0.0") + async def test_client_id(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + assert await r.client_id(target_nodes=node) > 0 + + @skip_if_server_version_lt("5.0.0") + async def test_client_unblock(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + myid = await r.client_id(target_nodes=node) + assert not await r.client_unblock(myid, target_nodes=node) + assert not await r.client_unblock(myid, error=True, target_nodes=node) + assert not await r.client_unblock(myid, error=False, target_nodes=node) + + @skip_if_server_version_lt("6.0.0") + async def test_client_getredir(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + assert isinstance(await r.client_getredir(target_nodes=node), int) + assert await r.client_getredir(target_nodes=node) == -1 + + @skip_if_server_version_lt("6.2.0") + async def test_client_info(self, r: RedisCluster) -> None: + node = r.get_primaries()[0] + info = await r.client_info(target_nodes=node) + assert isinstance(info, dict) + assert "addr" in info + + @skip_if_server_version_lt("2.6.9") + async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: + node = r.get_primaries()[0] + await r.client_setname("redis-py-c1", target_nodes="all") + await r2.client_setname("redis-py-c2", target_nodes="all") + clients = [ + client + for client in await r.client_list(target_nodes=node) + if client.get("name") in ["redis-py-c1", "redis-py-c2"] + ] + assert len(clients) == 2 + clients_by_name = {client.get("name"): client for client in clients} + + client_addr = clients_by_name["redis-py-c2"].get("addr") + assert await r.client_kill(client_addr, target_nodes=node) is True + + clients = [ + client + for client in await r.client_list(target_nodes=node) + if client.get("name") in ["redis-py-c1", "redis-py-c2"] + ] + assert len(clients) == 1 + assert clients[0].get("name") == "redis-py-c1" + + @skip_if_server_version_lt("2.6.0") + async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None: + await r.set("{foo}a", "") + await r.bitop("not", "{foo}r", "{foo}a") + assert await r.get("{foo}r") is None + + @skip_if_server_version_lt("2.6.0") + async def test_cluster_bitop_not(self, r: RedisCluster) -> None: + test_str = b"\xAA\x00\xFF\x55" + correct = ~0xAA00FF55 & 0xFFFFFFFF + await r.set("{foo}a", test_str) + await r.bitop("not", "{foo}r", "{foo}a") + assert int(binascii.hexlify(await r.get("{foo}r")), 16) == correct + + @skip_if_server_version_lt("2.6.0") + async def test_cluster_bitop_not_in_place(self, r: RedisCluster) -> None: + test_str = b"\xAA\x00\xFF\x55" + correct = ~0xAA00FF55 & 0xFFFFFFFF + await r.set("{foo}a", test_str) + await r.bitop("not", "{foo}a", "{foo}a") + assert int(binascii.hexlify(await r.get("{foo}a")), 16) == correct + + @skip_if_server_version_lt("2.6.0") + async def test_cluster_bitop_single_string(self, r: RedisCluster) -> None: + test_str = b"\x01\x02\xFF" + await r.set("{foo}a", test_str) + await r.bitop("and", "{foo}res1", "{foo}a") + await r.bitop("or", "{foo}res2", "{foo}a") + await r.bitop("xor", "{foo}res3", "{foo}a") + assert await r.get("{foo}res1") == test_str + assert await r.get("{foo}res2") == test_str + assert await r.get("{foo}res3") == test_str + + @skip_if_server_version_lt("2.6.0") + async def test_cluster_bitop_string_operands(self, r: RedisCluster) -> None: + await r.set("{foo}a", b"\x01\x02\xFF\xFF") + await r.set("{foo}b", b"\x01\x02\xFF") + await r.bitop("and", "{foo}res1", "{foo}a", "{foo}b") + await r.bitop("or", "{foo}res2", "{foo}a", "{foo}b") + await r.bitop("xor", "{foo}res3", "{foo}a", "{foo}b") + assert int(binascii.hexlify(await r.get("{foo}res1")), 16) == 0x0102FF00 + assert int(binascii.hexlify(await r.get("{foo}res2")), 16) == 0x0102FFFF + assert int(binascii.hexlify(await r.get("{foo}res3")), 16) == 0x000000FF + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_copy(self, r: RedisCluster) -> None: + assert await r.copy("{foo}a", "{foo}b") == 0 + await r.set("{foo}a", "bar") + assert await r.copy("{foo}a", "{foo}b") == 1 + assert await r.get("{foo}a") == b"bar" + assert await r.get("{foo}b") == b"bar" + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_copy_and_replace(self, r: RedisCluster) -> None: + await r.set("{foo}a", "foo1") + await r.set("{foo}b", "foo2") + assert await r.copy("{foo}a", "{foo}b") == 0 + assert await r.copy("{foo}a", "{foo}b", replace=True) == 1 + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_lmove(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "one", "two", "three", "four") + assert await r.lmove("{foo}a", "{foo}b") + assert await r.lmove("{foo}a", "{foo}b", "right", "left") + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_blmove(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "one", "two", "three", "four") + assert await r.blmove("{foo}a", "{foo}b", 5) + assert await r.blmove("{foo}a", "{foo}b", 1, "RIGHT", "LEFT") + + async def test_cluster_msetnx(self, r: RedisCluster) -> None: + d = {"{foo}a": b"1", "{foo}b": b"2", "{foo}c": b"3"} + assert await r.msetnx(d) + d2 = {"{foo}a": b"x", "{foo}d": b"4"} + assert not await r.msetnx(d2) + for k, v in d.items(): + assert await r.get(k) == v + assert await r.get("{foo}d") is None + + async def test_cluster_rename(self, r: RedisCluster) -> None: + await r.set("{foo}a", "1") + assert await r.rename("{foo}a", "{foo}b") + assert await r.get("{foo}a") is None + assert await r.get("{foo}b") == b"1" + + async def test_cluster_renamenx(self, r: RedisCluster) -> None: + await r.set("{foo}a", "1") + await r.set("{foo}b", "2") + assert not await r.renamenx("{foo}a", "{foo}b") + assert await r.get("{foo}a") == b"1" + assert await r.get("{foo}b") == b"2" + + # LIST COMMANDS + async def test_cluster_blpop(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "1", "2") + await r.rpush("{foo}b", "3", "4") + assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}b", b"3") + assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}b", b"4") + assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}a", b"1") + assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}a", b"2") + assert await r.blpop(["{foo}b", "{foo}a"], timeout=1) is None + await r.rpush("{foo}c", "1") + assert await r.blpop("{foo}c", timeout=1) == (b"{foo}c", b"1") + + async def test_cluster_brpop(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "1", "2") + await r.rpush("{foo}b", "3", "4") + assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}b", b"4") + assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}b", b"3") + assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}a", b"2") + assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) == (b"{foo}a", b"1") + assert await r.brpop(["{foo}b", "{foo}a"], timeout=1) is None + await r.rpush("{foo}c", "1") + assert await r.brpop("{foo}c", timeout=1) == (b"{foo}c", b"1") + + async def test_cluster_brpoplpush(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "1", "2") + await r.rpush("{foo}b", "3", "4") + assert await r.brpoplpush("{foo}a", "{foo}b") == b"2" + assert await r.brpoplpush("{foo}a", "{foo}b") == b"1" + assert await r.brpoplpush("{foo}a", "{foo}b", timeout=1) is None + assert await r.lrange("{foo}a", 0, -1) == [] + assert await r.lrange("{foo}b", 0, -1) == [b"1", b"2", b"3", b"4"] + + async def test_cluster_brpoplpush_empty_string(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "") + assert await r.brpoplpush("{foo}a", "{foo}b") == b"" + + async def test_cluster_rpoplpush(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "a1", "a2", "a3") + await r.rpush("{foo}b", "b1", "b2", "b3") + assert await r.rpoplpush("{foo}a", "{foo}b") == b"a3" + assert await r.lrange("{foo}a", 0, -1) == [b"a1", b"a2"] + assert await r.lrange("{foo}b", 0, -1) == [b"a3", b"b1", b"b2", b"b3"] + + async def test_cluster_sdiff(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2", "3") + assert await r.sdiff("{foo}a", "{foo}b") == {b"1", b"2", b"3"} + await r.sadd("{foo}b", "2", "3") + assert await r.sdiff("{foo}a", "{foo}b") == {b"1"} + + async def test_cluster_sdiffstore(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2", "3") + assert await r.sdiffstore("{foo}c", "{foo}a", "{foo}b") == 3 + assert await r.smembers("{foo}c") == {b"1", b"2", b"3"} + await r.sadd("{foo}b", "2", "3") + assert await r.sdiffstore("{foo}c", "{foo}a", "{foo}b") == 1 + assert await r.smembers("{foo}c") == {b"1"} + + async def test_cluster_sinter(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2", "3") + assert await r.sinter("{foo}a", "{foo}b") == set() + await r.sadd("{foo}b", "2", "3") + assert await r.sinter("{foo}a", "{foo}b") == {b"2", b"3"} + + async def test_cluster_sinterstore(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2", "3") + assert await r.sinterstore("{foo}c", "{foo}a", "{foo}b") == 0 + assert await r.smembers("{foo}c") == set() + await r.sadd("{foo}b", "2", "3") + assert await r.sinterstore("{foo}c", "{foo}a", "{foo}b") == 2 + assert await r.smembers("{foo}c") == {b"2", b"3"} + + async def test_cluster_smove(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "a1", "a2") + await r.sadd("{foo}b", "b1", "b2") + assert await r.smove("{foo}a", "{foo}b", "a1") + assert await r.smembers("{foo}a") == {b"a2"} + assert await r.smembers("{foo}b") == {b"b1", b"b2", b"a1"} + + async def test_cluster_sunion(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2") + await r.sadd("{foo}b", "2", "3") + assert await r.sunion("{foo}a", "{foo}b") == {b"1", b"2", b"3"} + + async def test_cluster_sunionstore(self, r: RedisCluster) -> None: + await r.sadd("{foo}a", "1", "2") + await r.sadd("{foo}b", "2", "3") + assert await r.sunionstore("{foo}c", "{foo}a", "{foo}b") == 3 + assert await r.smembers("{foo}c") == {b"1", b"2", b"3"} + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_zdiff(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3}) + await r.zadd("{foo}b", {"a1": 1, "a2": 2}) + assert await r.zdiff(["{foo}a", "{foo}b"]) == [b"a3"] + assert await r.zdiff(["{foo}a", "{foo}b"], withscores=True) == [b"a3", b"3"] + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_zdiffstore(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3}) + await r.zadd("{foo}b", {"a1": 1, "a2": 2}) + assert await r.zdiffstore("{foo}out", ["{foo}a", "{foo}b"]) + assert await r.zrange("{foo}out", 0, -1) == [b"a3"] + assert await r.zrange("{foo}out", 0, -1, withscores=True) == [(b"a3", 3.0)] + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_zinter(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert await r.zinter(["{foo}a", "{foo}b", "{foo}c"]) == [b"a3", b"a1"] + # invalid aggregation + with pytest.raises(DataError): + await r.zinter( + ["{foo}a", "{foo}b", "{foo}c"], aggregate="foo", withscores=True + ) + # aggregate with SUM + assert await r.zinter(["{foo}a", "{foo}b", "{foo}c"], withscores=True) == [ + (b"a3", 8), + (b"a1", 9), + ] + # aggregate with MAX + assert await r.zinter( + ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX", withscores=True + ) == [(b"a3", 5), (b"a1", 6)] + # aggregate with MIN + assert await r.zinter( + ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN", withscores=True + ) == [(b"a1", 1), (b"a3", 1)] + # with weights + assert await r.zinter( + {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}, withscores=True + ) == [(b"a3", 20), (b"a1", 23)] + + async def test_cluster_zinterstore_sum(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert await r.zinterstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"]) == 2 + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a3", 8), + (b"a1", 9), + ] + + async def test_cluster_zinterstore_max(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zinterstore( + "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX" + ) + == 2 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a3", 5), + (b"a1", 6), + ] + + async def test_cluster_zinterstore_min(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3}) + await r.zadd("{foo}b", {"a1": 2, "a2": 3, "a3": 5}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zinterstore( + "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN" + ) + == 2 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a1", 1), + (b"a3", 3), + ] + + async def test_cluster_zinterstore_with_weight(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zinterstore("{foo}d", {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}) == 2 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a3", 20), + (b"a1", 23), + ] + + @skip_if_server_version_lt("4.9.0") + async def test_cluster_bzpopmax(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2}) + await r.zadd("{foo}b", {"b1": 10, "b2": 20}) + assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}b", + b"b2", + 20, + ) + assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}b", + b"b1", + 10, + ) + assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}a", + b"a2", + 2, + ) + assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}a", + b"a1", + 1, + ) + assert await r.bzpopmax(["{foo}b", "{foo}a"], timeout=1) is None + await r.zadd("{foo}c", {"c1": 100}) + assert await r.bzpopmax("{foo}c", timeout=1) == (b"{foo}c", b"c1", 100) + + @skip_if_server_version_lt("4.9.0") + async def test_cluster_bzpopmin(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2}) + await r.zadd("{foo}b", {"b1": 10, "b2": 20}) + assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}b", + b"b1", + 10, + ) + assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}b", + b"b2", + 20, + ) + assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}a", + b"a1", + 1, + ) + assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) == ( + b"{foo}a", + b"a2", + 2, + ) + assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None + await r.zadd("{foo}c", {"c1": 100}) + assert await r.bzpopmin("{foo}c", timeout=1) == (b"{foo}c", b"c1", 100) + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_zrangestore(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3}) + assert await r.zrangestore("{foo}b", "{foo}a", 0, 1) + assert await r.zrange("{foo}b", 0, -1) == [b"a1", b"a2"] + assert await r.zrangestore("{foo}b", "{foo}a", 1, 2) + assert await r.zrange("{foo}b", 0, -1) == [b"a2", b"a3"] + assert await r.zrange("{foo}b", 0, -1, withscores=True) == [ + (b"a2", 2), + (b"a3", 3), + ] + # reversed order + assert await r.zrangestore("{foo}b", "{foo}a", 1, 2, desc=True) + assert await r.zrange("{foo}b", 0, -1) == [b"a1", b"a2"] + # by score + assert await r.zrangestore( + "{foo}b", "{foo}a", 2, 1, byscore=True, offset=0, num=1, desc=True + ) + assert await r.zrange("{foo}b", 0, -1) == [b"a2"] + # by lex + assert await r.zrangestore( + "{foo}b", "{foo}a", "[a2", "(a3", bylex=True, offset=0, num=1 + ) + assert await r.zrange("{foo}b", 0, -1) == [b"a2"] + + @skip_if_server_version_lt("6.2.0") + async def test_cluster_zunion(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + # sum + assert await r.zunion(["{foo}a", "{foo}b", "{foo}c"]) == [ + b"a2", + b"a4", + b"a3", + b"a1", + ] + assert await r.zunion(["{foo}a", "{foo}b", "{foo}c"], withscores=True) == [ + (b"a2", 3), + (b"a4", 4), + (b"a3", 8), + (b"a1", 9), + ] + # max + assert await r.zunion( + ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX", withscores=True + ) == [(b"a2", 2), (b"a4", 4), (b"a3", 5), (b"a1", 6)] + # min + assert await r.zunion( + ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN", withscores=True + ) == [(b"a1", 1), (b"a2", 1), (b"a3", 1), (b"a4", 4)] + # with weight + assert await r.zunion( + {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}, withscores=True + ) == [(b"a2", 5), (b"a4", 12), (b"a3", 20), (b"a1", 23)] + + async def test_cluster_zunionstore_sum(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert await r.zunionstore("{foo}d", ["{foo}a", "{foo}b", "{foo}c"]) == 4 + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a2", 3), + (b"a4", 4), + (b"a3", 8), + (b"a1", 9), + ] + + async def test_cluster_zunionstore_max(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zunionstore( + "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MAX" + ) + == 4 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a2", 2), + (b"a4", 4), + (b"a3", 5), + (b"a1", 6), + ] + + async def test_cluster_zunionstore_min(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 2, "a3": 3}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 4}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zunionstore( + "{foo}d", ["{foo}a", "{foo}b", "{foo}c"], aggregate="MIN" + ) + == 4 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a1", 1), + (b"a2", 2), + (b"a3", 3), + (b"a4", 4), + ] + + async def test_cluster_zunionstore_with_weight(self, r: RedisCluster) -> None: + await r.zadd("{foo}a", {"a1": 1, "a2": 1, "a3": 1}) + await r.zadd("{foo}b", {"a1": 2, "a2": 2, "a3": 2}) + await r.zadd("{foo}c", {"a1": 6, "a3": 5, "a4": 4}) + assert ( + await r.zunionstore("{foo}d", {"{foo}a": 1, "{foo}b": 2, "{foo}c": 3}) == 4 + ) + assert await r.zrange("{foo}d", 0, -1, withscores=True) == [ + (b"a2", 5), + (b"a4", 12), + (b"a3", 20), + (b"a1", 23), + ] + + @skip_if_server_version_lt("2.8.9") + async def test_cluster_pfcount(self, r: RedisCluster) -> None: + members = {b"1", b"2", b"3"} + await r.pfadd("{foo}a", *members) + assert await r.pfcount("{foo}a") == len(members) + members_b = {b"2", b"3", b"4"} + await r.pfadd("{foo}b", *members_b) + assert await r.pfcount("{foo}b") == len(members_b) + assert await r.pfcount("{foo}a", "{foo}b") == len(members_b.union(members)) + + @skip_if_server_version_lt("2.8.9") + async def test_cluster_pfmerge(self, r: RedisCluster) -> None: + mema = {b"1", b"2", b"3"} + memb = {b"2", b"3", b"4"} + memc = {b"5", b"6", b"7"} + await r.pfadd("{foo}a", *mema) + await r.pfadd("{foo}b", *memb) + await r.pfadd("{foo}c", *memc) + await r.pfmerge("{foo}d", "{foo}c", "{foo}a") + assert await r.pfcount("{foo}d") == 6 + await r.pfmerge("{foo}d", "{foo}b") + assert await r.pfcount("{foo}d") == 7 + + async def test_cluster_sort_store(self, r: RedisCluster) -> None: + await r.rpush("{foo}a", "2", "3", "1") + assert await r.sort("{foo}a", store="{foo}sorted_values") == 3 + assert await r.lrange("{foo}sorted_values", 0, -1) == [b"1", b"2", b"3"] + + # GEO COMMANDS + @skip_if_server_version_lt("6.2.0") + async def test_cluster_geosearchstore(self, r: RedisCluster) -> None: + values = (2.1909389952632, 41.433791470673, "place1") + ( + 2.1873744593677, + 41.406342043777, + "place2", + ) + + await r.geoadd("{foo}barcelona", values) + await r.geosearchstore( + "{foo}places_barcelona", + "{foo}barcelona", + longitude=2.191, + latitude=41.433, + radius=1000, + ) + assert await r.zrange("{foo}places_barcelona", 0, -1) == [b"place1"] + + @skip_unless_arch_bits(64) + @skip_if_server_version_lt("6.2.0") + async def test_geosearchstore_dist(self, r: RedisCluster) -> None: + values = (2.1909389952632, 41.433791470673, "place1") + ( + 2.1873744593677, + 41.406342043777, + "place2", + ) + + await r.geoadd("{foo}barcelona", values) + await r.geosearchstore( + "{foo}places_barcelona", + "{foo}barcelona", + longitude=2.191, + latitude=41.433, + radius=1000, + storedist=True, + ) + # instead of save the geo score, the distance is saved. + assert await r.zscore("{foo}places_barcelona", "place1") == 88.05060698409301 + + @skip_if_server_version_lt("3.2.0") + async def test_cluster_georadius_store(self, r: RedisCluster) -> None: + values = (2.1909389952632, 41.433791470673, "place1") + ( + 2.1873744593677, + 41.406342043777, + "place2", + ) + + await r.geoadd("{foo}barcelona", values) + await r.georadius( + "{foo}barcelona", 2.191, 41.433, 1000, store="{foo}places_barcelona" + ) + assert await r.zrange("{foo}places_barcelona", 0, -1) == [b"place1"] + + @skip_unless_arch_bits(64) + @skip_if_server_version_lt("3.2.0") + async def test_cluster_georadius_store_dist(self, r: RedisCluster) -> None: + values = (2.1909389952632, 41.433791470673, "place1") + ( + 2.1873744593677, + 41.406342043777, + "place2", + ) + + await r.geoadd("{foo}barcelona", values) + await r.georadius( + "{foo}barcelona", 2.191, 41.433, 1000, store_dist="{foo}places_barcelona" + ) + # instead of save the geo score, the distance is saved. + assert await r.zscore("{foo}places_barcelona", "place1") == 88.05060698409301 + + async def test_cluster_dbsize(self, r: RedisCluster) -> None: + d = {"a": b"1", "b": b"2", "c": b"3", "d": b"4"} + assert await r.mset_nonatomic(d) + assert await r.dbsize(target_nodes="primaries") == len(d) + + async def test_cluster_keys(self, r: RedisCluster) -> None: + assert await r.keys() == [] + keys_with_underscores = {b"test_a", b"test_b"} + keys = keys_with_underscores.union({b"testc"}) + for key in keys: + await r.set(key, 1) + assert ( + set(await r.keys(pattern="test_*", target_nodes="primaries")) + == keys_with_underscores + ) + assert set(await r.keys(pattern="test*", target_nodes="primaries")) == keys + + # SCAN COMMANDS + @skip_if_server_version_lt("2.8.0") + async def test_cluster_scan(self, r: RedisCluster) -> None: + await r.set("a", 1) + await r.set("b", 2) + await r.set("c", 3) + + for target_nodes, nodes in zip( + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ): + cursors, keys = await r.scan(target_nodes=target_nodes) + assert sorted(keys) == [b"a", b"b", b"c"] + assert sorted(cursors.keys()) == sorted(node.name for node in nodes) + assert all(cursor == 0 for cursor in cursors.values()) + + cursors, keys = await r.scan(match="a*", target_nodes=target_nodes) + assert sorted(keys) == [b"a"] + assert sorted(cursors.keys()) == sorted(node.name for node in nodes) + assert all(cursor == 0 for cursor in cursors.values()) + + @skip_if_server_version_lt("6.0.0") + async def test_cluster_scan_type(self, r: RedisCluster) -> None: + await r.sadd("a-set", 1) + await r.sadd("b-set", 1) + await r.sadd("c-set", 1) + await r.hset("a-hash", "foo", 2) + await r.lpush("a-list", "aux", 3) + + for target_nodes, nodes in zip( + ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()] + ): + cursors, keys = await r.scan(_type="SET", target_nodes=target_nodes) + assert sorted(keys) == [b"a-set", b"b-set", b"c-set"] + assert sorted(cursors.keys()) == sorted(node.name for node in nodes) + assert all(cursor == 0 for cursor in cursors.values()) + + cursors, keys = await r.scan( + _type="SET", match="a*", target_nodes=target_nodes + ) + assert sorted(keys) == [b"a-set"] + assert sorted(cursors.keys()) == sorted(node.name for node in nodes) + assert all(cursor == 0 for cursor in cursors.values()) + + @skip_if_server_version_lt("2.8.0") + async def test_cluster_scan_iter(self, r: RedisCluster) -> None: + keys_all = [] + keys_1 = [] + for i in range(100): + s = str(i) + await r.set(s, 1) + keys_all.append(s.encode("utf-8")) + if s.startswith("1"): + keys_1.append(s.encode("utf-8")) + keys_all.sort() + keys_1.sort() + + for target_nodes in ["primaries", "replicas"]: + keys = [key async for key in r.scan_iter(target_nodes=target_nodes)] + assert sorted(keys) == keys_all + + keys = [ + key async for key in r.scan_iter(match="1*", target_nodes=target_nodes) + ] + assert sorted(keys) == keys_1 + + async def test_cluster_randomkey(self, r: RedisCluster) -> None: + node = r.get_node_from_key("{foo}") + assert await r.randomkey(target_nodes=node) is None + for key in ("{foo}a", "{foo}b", "{foo}c"): + await r.set(key, 1) + assert await r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c") + + @skip_if_server_version_lt("6.0.0") + @skip_if_redis_enterprise() + async def test_acl_log( + self, r: RedisCluster, request: FixtureRequest, create_redis: Callable + ) -> None: + key = "{cache}:" + node = r.get_node_from_key(key) + username = "redis-py-user" + + await r.acl_setuser( + username, + enabled=True, + reset=True, + commands=["+get", "+set", "+select", "+cluster", "+command", "+info"], + keys=["{cache}:*"], + nopass=True, + target_nodes="primaries", + ) + await r.acl_log_reset(target_nodes=node) + + user_client = await create_redis( + cls=RedisCluster, flushdb=False, username=username + ) + + # Valid operation and key + assert await user_client.set("{cache}:0", 1) + assert await user_client.get("{cache}:0") == b"1" + + # Invalid key + with pytest.raises(NoPermissionError): + await user_client.get("{cache}violated_cache:0") + + # Invalid operation + with pytest.raises(NoPermissionError): + await user_client.hset("{cache}:0", "hkey", "hval") + + assert isinstance(await r.acl_log(target_nodes=node), list) + assert len(await r.acl_log(target_nodes=node)) == 2 + assert len(await r.acl_log(count=1, target_nodes=node)) == 1 + assert isinstance((await r.acl_log(target_nodes=node))[0], dict) + assert "client-info" in (await r.acl_log(count=1, target_nodes=node))[0] + assert await r.acl_log_reset(target_nodes=node) + + await r.acl_deluser(username, target_nodes="primaries") + + await user_client.close() + + +@pytest.mark.onlycluster +class TestNodesManager: + """ + Tests for the NodesManager class + """ + + async def test_load_balancer(self, r: RedisCluster) -> None: + n_manager = r.nodes_manager + lb = n_manager.read_load_balancer + slot_1 = 1257 + slot_2 = 8975 + node_1 = ClusterNode(default_host, 6379, PRIMARY) + node_2 = ClusterNode(default_host, 6378, REPLICA) + node_3 = ClusterNode(default_host, 6377, REPLICA) + node_4 = ClusterNode(default_host, 6376, PRIMARY) + node_5 = ClusterNode(default_host, 6375, REPLICA) + n_manager.slots_cache = { + slot_1: [node_1, node_2, node_3], + slot_2: [node_4, node_5], + } + primary1_name = n_manager.slots_cache[slot_1][0].name + primary2_name = n_manager.slots_cache[slot_2][0].name + list1_size = len(n_manager.slots_cache[slot_1]) + list2_size = len(n_manager.slots_cache[slot_2]) + # slot 1 + assert lb.get_server_index(primary1_name, list1_size) == 0 + assert lb.get_server_index(primary1_name, list1_size) == 1 + assert lb.get_server_index(primary1_name, list1_size) == 2 + assert lb.get_server_index(primary1_name, list1_size) == 0 + # slot 2 + assert lb.get_server_index(primary2_name, list2_size) == 0 + assert lb.get_server_index(primary2_name, list2_size) == 1 + assert lb.get_server_index(primary2_name, list2_size) == 0 + + lb.reset() + assert lb.get_server_index(primary1_name, list1_size) == 0 + assert lb.get_server_index(primary2_name, list2_size) == 0 + + async def test_init_slots_cache_not_all_slots_covered(self) -> None: + """ + Test that if not all slots are covered it should raise an exception + """ + # Missing slot 5460 + cluster_slots = [ + [0, 5459, ["127.0.0.1", 7000], ["127.0.0.1", 7003]], + [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]], + [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.1", 7005]], + ] + with pytest.raises(RedisClusterException) as ex: + rc = await get_mocked_redis_client( + host=default_host, + port=default_port, + cluster_slots=cluster_slots, + require_full_coverage=True, + ) + await rc.close() + assert str(ex.value).startswith( + "All slots are not covered after query all startup_nodes." + ) + + async def test_init_slots_cache_not_require_full_coverage_success(self) -> None: + """ + When require_full_coverage is set to False and not all slots are + covered the cluster client initialization should succeed + """ + # Missing slot 5460 + cluster_slots = [ + [0, 5459, ["127.0.0.1", 7000], ["127.0.0.1", 7003]], + [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]], + [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.1", 7005]], + ] + + rc = await get_mocked_redis_client( + host=default_host, + port=default_port, + cluster_slots=cluster_slots, + require_full_coverage=False, + ) + + assert 5460 not in rc.nodes_manager.slots_cache + + await rc.close() + + async def test_init_slots_cache(self) -> None: + """ + Test that slots cache can in initialized and all slots are covered + """ + good_slots_resp = [ + [0, 5460, ["127.0.0.1", 7000], ["127.0.0.2", 7003]], + [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.2", 7004]], + [10923, 16383, ["127.0.0.1", 7002], ["127.0.0.2", 7005]], + ] + + rc = await get_mocked_redis_client( + host=default_host, port=default_port, cluster_slots=good_slots_resp + ) + n_manager = rc.nodes_manager + assert len(n_manager.slots_cache) == REDIS_CLUSTER_HASH_SLOTS + for slot_info in good_slots_resp: + all_hosts = ["127.0.0.1", "127.0.0.2"] + all_ports = [7000, 7001, 7002, 7003, 7004, 7005] + slot_start = slot_info[0] + slot_end = slot_info[1] + for i in range(slot_start, slot_end + 1): + assert len(n_manager.slots_cache[i]) == len(slot_info[2:]) + assert n_manager.slots_cache[i][0].host in all_hosts + assert n_manager.slots_cache[i][1].host in all_hosts + assert n_manager.slots_cache[i][0].port in all_ports + assert n_manager.slots_cache[i][1].port in all_ports + + assert len(n_manager.nodes_cache) == 6 + + await rc.close() + + async def test_init_slots_cache_cluster_mode_disabled(self) -> None: + """ + Test that creating a RedisCluster failes if one of the startup nodes + has cluster mode disabled + """ + with pytest.raises(RedisClusterException) as e: + rc = await get_mocked_redis_client( + host=default_host, port=default_port, cluster_enabled=False + ) + await rc.close() + assert "Cluster mode is not enabled on this node" in str(e.value) + + async def test_empty_startup_nodes(self) -> None: + """ + It should not be possible to create a node manager with no nodes + specified + """ + with pytest.raises(RedisClusterException): + await NodesManager([]).initialize() + + async def test_wrong_startup_nodes_type(self) -> None: + """ + If something other then a list type itteratable is provided it should + fail + """ + with pytest.raises(RedisClusterException): + await NodesManager({}).initialize() + + async def test_init_slots_cache_slots_collision( + self, request: FixtureRequest + ) -> None: + """ + Test that if 2 nodes do not agree on the same slots setup it should + raise an error. In this test both nodes will say that the first + slots block should be bound to different servers. + """ + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + + async def mocked_execute_command(self, *args, **kwargs): + """ + Helper function to return custom slots cache data from + different redis nodes + """ + if self.port == 7000: + result = [ + [0, 5460, ["127.0.0.1", 7000], ["127.0.0.1", 7003]], + [5461, 10922, ["127.0.0.1", 7001], ["127.0.0.1", 7004]], + ] + + elif self.port == 7001: + result = [ + [0, 5460, ["127.0.0.1", 7001], ["127.0.0.1", 7003]], + [5461, 10922, ["127.0.0.1", 7000], ["127.0.0.1", 7004]], + ] + else: + result = [] + + if args[0] == "CLUSTER SLOTS": + return result + elif args[0] == "INFO": + return {"cluster_enabled": True} + elif args[1] == "cluster-require-full-coverage": + return {"cluster-require-full-coverage": "yes"} + + execute_command.side_effect = mocked_execute_command + + with pytest.raises(RedisClusterException) as ex: + node_1 = ClusterNode("127.0.0.1", 7000) + node_2 = ClusterNode("127.0.0.1", 7001) + async with RedisCluster(startup_nodes=[node_1, node_2]): + ... + assert str(ex.value).startswith( + "startup_nodes could not agree on a valid slots cache" + ), str(ex.value) + + async def test_cluster_one_instance(self) -> None: + """ + If the cluster exists of only 1 node then there is some hacks that must + be validated they work. + """ + node = ClusterNode(default_host, default_port) + cluster_slots = [[0, 16383, ["", default_port]]] + rc = await get_mocked_redis_client( + startup_nodes=[node], cluster_slots=cluster_slots + ) + + n = rc.nodes_manager + assert len(n.nodes_cache) == 1 + n_node = rc.get_node(node_name=node.name) + assert n_node is not None + assert n_node == node + assert n_node.server_type == PRIMARY + assert len(n.slots_cache) == REDIS_CLUSTER_HASH_SLOTS + for i in range(0, REDIS_CLUSTER_HASH_SLOTS): + assert n.slots_cache[i] == [n_node] + + await rc.close() + + async def test_init_with_down_node(self) -> None: + """ + If I can't connect to one of the nodes, everything should still work. + But if I can't connect to any of the nodes, exception should be thrown. + """ + with mock.patch.object( + ClusterNode, "execute_command", autospec=True + ) as execute_command: + + async def mocked_execute_command(self, *args, **kwargs): + if self.port == 7000: + raise ConnectionError("mock connection error for 7000") + + if args[0] == "CLUSTER SLOTS": + return [ + [0, 8191, ["127.0.0.1", 7001, "node_1"]], + [8192, 16383, ["127.0.0.1", 7002, "node_2"]], + ] + elif args[0] == "INFO": + return {"cluster_enabled": True} + elif args[1] == "cluster-require-full-coverage": + return {"cluster-require-full-coverage": "yes"} + + execute_command.side_effect = mocked_execute_command + + node_1 = ClusterNode("127.0.0.1", 7000) + node_2 = ClusterNode("127.0.0.1", 7001) + + # If all startup nodes fail to connect, connection error should be + # thrown + with pytest.raises(RedisClusterException) as e: + async with RedisCluster(startup_nodes=[node_1]): + ... + assert "Redis Cluster cannot be connected" in str(e.value) + + with mock.patch.object( + CommandsParser, "initialize", autospec=True + ) as cmd_parser_initialize: + + def cmd_init_mock(self, r): + self.commands = { + "GET": { + "name": "get", + "arity": 2, + "flags": ["readonly", "fast"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + } + } + + cmd_parser_initialize.side_effect = cmd_init_mock + # When at least one startup node is reachable, the cluster + # initialization should succeeds + async with RedisCluster(startup_nodes=[node_1, node_2]) as rc: + assert rc.get_node(host=default_host, port=7001) is not None + assert rc.get_node(host=default_host, port=7002) is not None |