summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-03-23 14:48:16 +0530
committerGitHub <noreply@github.com>2022-03-23 11:18:16 +0200
commit827dcde5c0af5f7aa9bdc3999fc86aa2ba945118 (patch)
tree2a7bf80c18b3c2eb391a779c0a4c965076114f7e
parent032fd227d3325a24a47b9d33fc42bccaafba28e2 (diff)
downloadredis-py-827dcde5c0af5f7aa9bdc3999fc86aa2ba945118.tar.gz
[CLUSTER] Fix scan command cursors & Fix scan_iter (#2054)
* cluster/scan: fix return cursor & change default node to primaries * cluster/scan_iter: fix iteration Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
-rw-r--r--CHANGES4
-rw-r--r--redis/cluster.py16
-rw-r--r--redis/commands/cluster.py38
-rw-r--r--tests/test_cluster.py59
4 files changed, 96 insertions, 21 deletions
diff --git a/CHANGES b/CHANGES
index aa5192d..95843ec 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,8 +1,10 @@
* Add `items` parameter to `hset` signature
- * Create codeql-analysis.yml (#1988). Thanks @chayim
+ * Create codeql-analysis.yml (#1988). Thanks @chayim
* Add limited support for Lua scripting with RedisCluster
* Implement `.lock()` method on RedisCluster
+ * Fix cursor returned by SCAN for RedisCluster & change default target to PRIMARIES
+ * Fix scan_iter for RedisCluster
* Remove verbose logging when initializing ClusterPubSub, ClusterPipeline or RedisCluster
* 4.1.3 (Feb 8, 2022)
diff --git a/redis/cluster.py b/redis/cluster.py
index 287fd4f..221df85 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -7,7 +7,7 @@ import threading
import time
from collections import OrderedDict
-from redis.client import CaseInsensitiveDict, PubSub, Redis
+from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import CommandsParser, RedisClusterCommands
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
@@ -51,10 +51,14 @@ def get_connection(redis_node, *args, **options):
def parse_scan_result(command, res, **options):
- keys_list = []
- for primary_res in res.values():
- keys_list += primary_res[1]
- return 0, keys_list
+ cursors = {}
+ ret = []
+ for node_name, response in res.items():
+ cursor, r = parse_scan(response, **options)
+ cursors[node_name] = cursor
+ ret += r
+
+ return cursors, ret
def parse_pubsub_numsub(command, res, **options):
@@ -244,7 +248,6 @@ class RedisCluster(RedisClusterCommands):
"INFO",
"SHUTDOWN",
"KEYS",
- "SCAN",
"DBSIZE",
"BGSAVE",
"SLOWLOG GET",
@@ -298,6 +301,7 @@ class RedisCluster(RedisClusterCommands):
"FUNCTION LIST",
"FUNCTION LOAD",
"FUNCTION RESTORE",
+ "SCAN",
"SCRIPT EXISTS",
"SCRIPT FLUSH",
"SCRIPT LOAD",
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index d68f93b..bcbfbd5 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -1,5 +1,8 @@
+from typing import Iterator, Union
+
from redis.crc import key_slot
from redis.exceptions import RedisClusterException, RedisError
+from redis.typing import PatternT
from .core import (
ACLCommands,
@@ -206,6 +209,41 @@ class ClusterDataAccessCommands(DataAccessCommands):
**kwargs,
)
+ def scan_iter(
+ self,
+ match: Union[PatternT, None] = None,
+ count: Union[int, None] = None,
+ _type: Union[str, None] = None,
+ **kwargs,
+ ) -> Iterator:
+ # Do the first query with cursor=0 for all nodes
+ cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
+ yield from data
+
+ cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
+ if cursors:
+ # Get nodes by name
+ nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
+
+ # Iterate over each node till its cursor is 0
+ kwargs.pop("target_nodes", None)
+ while cursors:
+ for name, cursor in cursors.items():
+ cur, data = self.scan(
+ cursor=cursor,
+ match=match,
+ count=count,
+ _type=_type,
+ target_nodes=nodes[name],
+ **kwargs,
+ )
+ yield from data
+ cursors[name] = cur[name]
+
+ cursors = {
+ name: cursor for name, cursor in cursors.items() if cursor != 0
+ }
+
class RedisClusterCommands(
ClusterMultiKeyCommands,
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index e9242e0..70cbe4f 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -1773,29 +1773,60 @@ class TestClusterRedisCommands:
r.set("a", 1)
r.set("b", 2)
r.set("c", 3)
- cursor, keys = r.scan(target_nodes="primaries")
- assert cursor == 0
- assert set(keys) == {b"a", b"b", b"c"}
- _, keys = r.scan(match="a", target_nodes="primaries")
- assert set(keys) == {b"a"}
+
+ for target_nodes, nodes in zip(
+ ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()]
+ ):
+ cursors, keys = r.scan(target_nodes=target_nodes)
+ assert sorted(keys) == [b"a", b"b", b"c"]
+ assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
+ assert all(cursor == 0 for cursor in cursors.values())
+
+ cursors, keys = r.scan(match="a*", target_nodes=target_nodes)
+ assert sorted(keys) == [b"a"]
+ assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
+ assert all(cursor == 0 for cursor in cursors.values())
@skip_if_server_version_lt("6.0.0")
def test_cluster_scan_type(self, r):
r.sadd("a-set", 1)
+ r.sadd("b-set", 1)
+ r.sadd("c-set", 1)
r.hset("a-hash", "foo", 2)
r.lpush("a-list", "aux", 3)
- _, keys = r.scan(match="a*", _type="SET", target_nodes="primaries")
- assert set(keys) == {b"a-set"}
+
+ for target_nodes, nodes in zip(
+ ["primaries", "replicas"], [r.get_primaries(), r.get_replicas()]
+ ):
+ cursors, keys = r.scan(_type="SET", target_nodes=target_nodes)
+ assert sorted(keys) == [b"a-set", b"b-set", b"c-set"]
+ assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
+ assert all(cursor == 0 for cursor in cursors.values())
+
+ cursors, keys = r.scan(_type="SET", match="a*", target_nodes=target_nodes)
+ assert sorted(keys) == [b"a-set"]
+ assert sorted(cursors.keys()) == sorted(node.name for node in nodes)
+ assert all(cursor == 0 for cursor in cursors.values())
@skip_if_server_version_lt("2.8.0")
def test_cluster_scan_iter(self, r):
- r.set("a", 1)
- r.set("b", 2)
- r.set("c", 3)
- keys = list(r.scan_iter(target_nodes="primaries"))
- assert set(keys) == {b"a", b"b", b"c"}
- keys = list(r.scan_iter(match="a", target_nodes="primaries"))
- assert set(keys) == {b"a"}
+ keys_all = []
+ keys_1 = []
+ for i in range(100):
+ s = str(i)
+ r.set(s, 1)
+ keys_all.append(s.encode("utf-8"))
+ if s.startswith("1"):
+ keys_1.append(s.encode("utf-8"))
+ keys_all.sort()
+ keys_1.sort()
+
+ for target_nodes in ["primaries", "replicas"]:
+ keys = r.scan_iter(target_nodes=target_nodes)
+ assert sorted(keys) == keys_all
+
+ keys = r.scan_iter(match="1*", target_nodes=target_nodes)
+ assert sorted(keys) == keys_1
def test_cluster_randomkey(self, r):
node = r.get_node_from_key("{foo}")