summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2022-03-06 13:49:13 +0200
committerGitHub <noreply@github.com>2022-03-06 13:49:13 +0200
commit98fd06eb8df1ecc109b4a5fb2d2a2e810142283e (patch)
tree0aec0ea9d1b87637381fbd06f7e93848ddc43433
parentc5d19b8571d2b15a29637f56a51b0da560072945 (diff)
downloadredis-py-98fd06eb8df1ecc109b4a5fb2d2a2e810142283e.tar.gz
Add support for JSON, TIMESERIES, BLOOM & GRAPH commands in cluster (#2032)
Co-authored-by: Chayim <chayim@users.noreply.github.com>
-rw-r--r--redis/cluster.py30
-rw-r--r--redis/commands/cluster.py2
-rw-r--r--redis/commands/json/__init__.py30
-rw-r--r--redis/commands/parser.py9
-rw-r--r--redis/commands/timeseries/__init__.py31
-rw-r--r--tests/test_bloom.py1
-rw-r--r--tests/test_graph.py1
-rw-r--r--tests/test_search.py3
-rw-r--r--tests/test_timeseries.py5
-rw-r--r--tox.ini2
10 files changed, 98 insertions, 16 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 4491e29..4b327ad 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands):
"READONLY",
"READWRITE",
"TIME",
+ "GRAPH.CONFIG",
],
DEFAULT_NODE,
),
@@ -810,6 +811,10 @@ class RedisCluster(RedisClusterCommands):
thread_local=thread_local,
)
+ def set_response_callback(self, command, callback):
+ """Set a custom Response Callback"""
+ self.cluster_response_callbacks[command] = callback
+
def _determine_nodes(self, *args, **kwargs):
command = args[0]
nodes_flag = kwargs.pop("nodes_flag", None)
@@ -1181,6 +1186,20 @@ class RedisCluster(RedisClusterCommands):
else:
return res
+ def load_external_module(
+ self,
+ funcname,
+ func,
+ ):
+ """
+ This function can be used to add externally defined redis modules,
+ and their namespaces to the redis client.
+
+ ``funcname`` - A string containing the name of the function to create
+ ``func`` - The function, being added to this class.
+ """
+ setattr(self, funcname, func)
+
class ClusterNode:
def __init__(self, host, port, server_type=None, redis_connection=None):
@@ -2026,7 +2045,13 @@ class ClusterPipeline(RedisCluster):
# turn the response back into a simple flat array that corresponds
# to the sequence of commands issued in the stack in pipeline.execute()
- response = [c.result for c in sorted(stack, key=lambda x: x.position)]
+ response = []
+ for c in sorted(stack, key=lambda x: x.position):
+ if c.args[0] in self.cluster_response_callbacks:
+ c.result = self.cluster_response_callbacks[c.args[0]](
+ c.result, **c.options
+ )
+ response.append(c.result)
if raise_on_error:
self.raise_first_error(stack)
@@ -2040,6 +2065,9 @@ class ClusterPipeline(RedisCluster):
"ASK & MOVED redirection not allowed in this pipeline"
)
+ def exists(self, *keys):
+ return self.execute_command("EXISTS", *keys)
+
def eval(self):
""" """
raise RedisClusterException("method eval() is not implemented")
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index 8bdcbba..7342c0c 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -9,6 +9,7 @@ from .core import (
ScriptCommands,
)
from .helpers import list_or_args
+from .redismodules import RedisModuleCommands
class ClusterMultiKeyCommands:
@@ -212,6 +213,7 @@ class RedisClusterCommands(
PubSubCommands,
ClusterDataAccessCommands,
ScriptCommands,
+ RedisModuleCommands,
):
"""
A class for all Redis Cluster commands
diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py
index 12c0648..638e4eb 100644
--- a/redis/commands/json/__init__.py
+++ b/redis/commands/json/__init__.py
@@ -103,16 +103,34 @@ class JSON(JSONCommands):
pipe.jsonget('foo')
pipe.jsonget('notakey')
"""
- p = Pipeline(
- connection_pool=self.client.connection_pool,
- response_callbacks=self.MODULE_CALLBACKS,
- transaction=transaction,
- shard_hint=shard_hint,
- )
+ if isinstance(self.client, redis.RedisCluster):
+ p = ClusterPipeline(
+ nodes_manager=self.client.nodes_manager,
+ commands_parser=self.client.commands_parser,
+ startup_nodes=self.client.nodes_manager.startup_nodes,
+ result_callbacks=self.client.result_callbacks,
+ cluster_response_callbacks=self.client.cluster_response_callbacks,
+ cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
+ read_from_replicas=self.client.read_from_replicas,
+ reinitialize_steps=self.client.reinitialize_steps,
+ )
+
+ else:
+ p = Pipeline(
+ connection_pool=self.client.connection_pool,
+ response_callbacks=self.MODULE_CALLBACKS,
+ transaction=transaction,
+ shard_hint=shard_hint,
+ )
+
p._encode = self._encode
p._decode = self._decode
return p
+class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline):
+ """Cluster pipeline for the module."""
+
+
class Pipeline(JSONCommands, redis.client.Pipeline):
"""Pipeline for the module."""
diff --git a/redis/commands/parser.py b/redis/commands/parser.py
index 2bb0576..89292ab 100644
--- a/redis/commands/parser.py
+++ b/redis/commands/parser.py
@@ -17,7 +17,14 @@ class CommandsParser:
self.initialize(redis_connection)
def initialize(self, r):
- self.commands = r.execute_command("COMMAND")
+ commands = r.execute_command("COMMAND")
+ uppercase_commands = []
+ for cmd in commands:
+ if any(x.isupper() for x in cmd):
+ uppercase_commands.append(cmd)
+ for cmd in uppercase_commands:
+ commands[cmd.lower()] = commands.pop(cmd)
+ self.commands = commands
# As soon as this PR is merged into Redis, we should reimplement
# our logic to use COMMAND INFO changes to determine the key positions
diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py
index 5b1f151..4720a43 100644
--- a/redis/commands/timeseries/__init__.py
+++ b/redis/commands/timeseries/__init__.py
@@ -1,4 +1,4 @@
-import redis.client
+import redis
from ..helpers import parse_to_list
from .commands import (
@@ -67,14 +67,31 @@ class TimeSeries(TimeSeriesCommands):
pipeline.execute()
"""
- p = Pipeline(
- connection_pool=self.client.connection_pool,
- response_callbacks=self.MODULE_CALLBACKS,
- transaction=transaction,
- shard_hint=shard_hint,
- )
+ if isinstance(self.client, redis.RedisCluster):
+ p = ClusterPipeline(
+ nodes_manager=self.client.nodes_manager,
+ commands_parser=self.client.commands_parser,
+ startup_nodes=self.client.nodes_manager.startup_nodes,
+ result_callbacks=self.client.result_callbacks,
+ cluster_response_callbacks=self.client.cluster_response_callbacks,
+ cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
+ read_from_replicas=self.client.read_from_replicas,
+ reinitialize_steps=self.client.reinitialize_steps,
+ )
+
+ else:
+ p = Pipeline(
+ connection_pool=self.client.connection_pool,
+ response_callbacks=self.MODULE_CALLBACKS,
+ transaction=transaction,
+ shard_hint=shard_hint,
+ )
return p
+class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline):
+ """Cluster pipeline for the module."""
+
+
class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
"""Pipeline for the module."""
diff --git a/tests/test_bloom.py b/tests/test_bloom.py
index 8936584..a3e9e15 100644
--- a/tests/test_bloom.py
+++ b/tests/test_bloom.py
@@ -191,6 +191,7 @@ def test_cms(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
def test_cms_merge(client):
assert client.cms().initbydim("A", 1000, 5)
assert client.cms().initbydim("B", 1000, 5)
diff --git a/tests/test_graph.py b/tests/test_graph.py
index 8de6346..3a430ed 100644
--- a/tests/test_graph.py
+++ b/tests/test_graph.py
@@ -342,6 +342,7 @@ def test_config(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
def test_list_keys(client):
result = client.graph().list_keys()
assert result == []
diff --git a/tests/test_search.py b/tests/test_search.py
index 5ee17a2..88d57a9 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -21,6 +21,9 @@ from redis.commands.search.suggestion import Suggestion
from .conftest import default_redismod_url, skip_ifmodversion_lt
+pytestmark = pytest.mark.onlynoncluster
+
+
WILL_PLAY_TEXT = os.path.abspath(
os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2")
)
diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py
index aee37aa..421c9d5 100644
--- a/tests/test_timeseries.py
+++ b/tests/test_timeseries.py
@@ -264,6 +264,7 @@ def test_rev_range(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
def testMultiRange(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"})
@@ -293,6 +294,7 @@ def testMultiRange(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
@skip_ifmodversion_lt("99.99.99", "timeseries")
def test_multi_range_advanced(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
@@ -349,6 +351,7 @@ def test_multi_range_advanced(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
@skip_ifmodversion_lt("99.99.99", "timeseries")
def test_multi_reverse_range(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
@@ -442,6 +445,7 @@ def test_get(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
def test_mget(client):
client.ts().create(1, labels={"Test": "This"})
client.ts().create(2, labels={"Test": "This", "Taste": "That"})
@@ -483,6 +487,7 @@ def testInfoDuplicatePolicy(client):
@pytest.mark.redismod
+@pytest.mark.onlynoncluster
def test_query_index(client):
client.ts().create(1, labels={"Test": "This"})
client.ts().create(2, labels={"Test": "This", "Taste": "That"})
diff --git a/tox.ini b/tox.ini
index 82e79d7..d4f15ac 100644
--- a/tox.ini
+++ b/tox.ini
@@ -286,7 +286,7 @@ setenv =
commands =
standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs}
standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}
- cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs}
+ cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs}
cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}
[testenv:redis5]