diff options
author | dvora-h <67596500+dvora-h@users.noreply.github.com> | 2022-03-06 13:49:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-06 13:49:13 +0200 |
commit | 98fd06eb8df1ecc109b4a5fb2d2a2e810142283e (patch) | |
tree | 0aec0ea9d1b87637381fbd06f7e93848ddc43433 | |
parent | c5d19b8571d2b15a29637f56a51b0da560072945 (diff) | |
download | redis-py-98fd06eb8df1ecc109b4a5fb2d2a2e810142283e.tar.gz |
Add support for JSON, TIMESERIES, BLOOM & GRAPH commands in cluster (#2032)
Co-authored-by: Chayim <chayim@users.noreply.github.com>
-rw-r--r-- | redis/cluster.py | 30 | ||||
-rw-r--r-- | redis/commands/cluster.py | 2 | ||||
-rw-r--r-- | redis/commands/json/__init__.py | 30 | ||||
-rw-r--r-- | redis/commands/parser.py | 9 | ||||
-rw-r--r-- | redis/commands/timeseries/__init__.py | 31 | ||||
-rw-r--r-- | tests/test_bloom.py | 1 | ||||
-rw-r--r-- | tests/test_graph.py | 1 | ||||
-rw-r--r-- | tests/test_search.py | 3 | ||||
-rw-r--r-- | tests/test_timeseries.py | 5 | ||||
-rw-r--r-- | tox.ini | 2 |
10 files changed, 98 insertions, 16 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index 4491e29..4b327ad 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands): "READONLY", "READWRITE", "TIME", + "GRAPH.CONFIG", ], DEFAULT_NODE, ), @@ -810,6 +811,10 @@ class RedisCluster(RedisClusterCommands): thread_local=thread_local, ) + def set_response_callback(self, command, callback): + """Set a custom Response Callback""" + self.cluster_response_callbacks[command] = callback + def _determine_nodes(self, *args, **kwargs): command = args[0] nodes_flag = kwargs.pop("nodes_flag", None) @@ -1181,6 +1186,20 @@ class RedisCluster(RedisClusterCommands): else: return res + def load_external_module( + self, + funcname, + func, + ): + """ + This function can be used to add externally defined redis modules, + and their namespaces to the redis client. + + ``funcname`` - A string containing the name of the function to create + ``func`` - The function, being added to this class. + """ + setattr(self, funcname, func) + class ClusterNode: def __init__(self, host, port, server_type=None, redis_connection=None): @@ -2026,7 +2045,13 @@ class ClusterPipeline(RedisCluster): # turn the response back into a simple flat array that corresponds # to the sequence of commands issued in the stack in pipeline.execute() - response = [c.result for c in sorted(stack, key=lambda x: x.position)] + response = [] + for c in sorted(stack, key=lambda x: x.position): + if c.args[0] in self.cluster_response_callbacks: + c.result = self.cluster_response_callbacks[c.args[0]]( + c.result, **c.options + ) + response.append(c.result) if raise_on_error: self.raise_first_error(stack) @@ -2040,6 +2065,9 @@ class ClusterPipeline(RedisCluster): "ASK & MOVED redirection not allowed in this pipeline" ) + def exists(self, *keys): + return self.execute_command("EXISTS", *keys) + def eval(self): """ """ raise RedisClusterException("method eval() is not implemented") diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 8bdcbba..7342c0c 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -9,6 +9,7 @@ from .core import ( ScriptCommands, ) from .helpers import list_or_args +from .redismodules import RedisModuleCommands class ClusterMultiKeyCommands: @@ -212,6 +213,7 @@ class RedisClusterCommands( PubSubCommands, ClusterDataAccessCommands, ScriptCommands, + RedisModuleCommands, ): """ A class for all Redis Cluster commands diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 12c0648..638e4eb 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -103,16 +103,34 @@ class JSON(JSONCommands): pipe.jsonget('foo') pipe.jsonget('notakey') """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p._encode = self._encode p._decode = self._decode return p +class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(JSONCommands, redis.client.Pipeline): """Pipeline for the module.""" diff --git a/redis/commands/parser.py b/redis/commands/parser.py index 2bb0576..89292ab 100644 --- a/redis/commands/parser.py +++ b/redis/commands/parser.py @@ -17,7 +17,14 @@ class CommandsParser: self.initialize(redis_connection) def initialize(self, r): - self.commands = r.execute_command("COMMAND") + commands = r.execute_command("COMMAND") + uppercase_commands = [] + for cmd in commands: + if any(x.isupper() for x in cmd): + uppercase_commands.append(cmd) + for cmd in uppercase_commands: + commands[cmd.lower()] = commands.pop(cmd) + self.commands = commands # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 5b1f151..4720a43 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -1,4 +1,4 @@ -import redis.client +import redis from ..helpers import parse_to_list from .commands import ( @@ -67,14 +67,31 @@ class TimeSeries(TimeSeriesCommands): pipeline.execute() """ - p = Pipeline( - connection_pool=self.client.connection_pool, - response_callbacks=self.MODULE_CALLBACKS, - transaction=transaction, - shard_hint=shard_hint, - ) + if isinstance(self.client, redis.RedisCluster): + p = ClusterPipeline( + nodes_manager=self.client.nodes_manager, + commands_parser=self.client.commands_parser, + startup_nodes=self.client.nodes_manager.startup_nodes, + result_callbacks=self.client.result_callbacks, + cluster_response_callbacks=self.client.cluster_response_callbacks, + cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, + read_from_replicas=self.client.read_from_replicas, + reinitialize_steps=self.client.reinitialize_steps, + ) + + else: + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) return p +class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline): + """Cluster pipeline for the module.""" + + class Pipeline(TimeSeriesCommands, redis.client.Pipeline): """Pipeline for the module.""" diff --git a/tests/test_bloom.py b/tests/test_bloom.py index 8936584..a3e9e15 100644 --- a/tests/test_bloom.py +++ b/tests/test_bloom.py @@ -191,6 +191,7 @@ def test_cms(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_cms_merge(client): assert client.cms().initbydim("A", 1000, 5) assert client.cms().initbydim("B", 1000, 5) diff --git a/tests/test_graph.py b/tests/test_graph.py index 8de6346..3a430ed 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -342,6 +342,7 @@ def test_config(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_list_keys(client): result = client.graph().list_keys() assert result == [] diff --git a/tests/test_search.py b/tests/test_search.py index 5ee17a2..88d57a9 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -21,6 +21,9 @@ from redis.commands.search.suggestion import Suggestion from .conftest import default_redismod_url, skip_ifmodversion_lt +pytestmark = pytest.mark.onlynoncluster + + WILL_PLAY_TEXT = os.path.abspath( os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2") ) diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index aee37aa..421c9d5 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -264,6 +264,7 @@ def test_rev_range(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def testMultiRange(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) @@ -293,6 +294,7 @@ def testMultiRange(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_range_advanced(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -349,6 +351,7 @@ def test_multi_range_advanced(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") def test_multi_reverse_range(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) @@ -442,6 +445,7 @@ def test_get(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_mget(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) @@ -483,6 +487,7 @@ def testInfoDuplicatePolicy(client): @pytest.mark.redismod +@pytest.mark.onlynoncluster def test_query_index(client): client.ts().create(1, labels={"Test": "This"}) client.ts().create(2, labels={"Test": "This", "Taste": "That"}) @@ -286,7 +286,7 @@ setenv = commands = standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs} standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} - cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs} + cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs} cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs} [testenv:redis5] |