From 0e73a4e510b62026164f85e35997e0d1e913e3a4 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Tue, 9 Nov 2021 10:57:20 +0200 Subject: Migrating pipeline support for JSON and TS --- redis/commands/helpers.py | 32 +++++++++++++++++++++++++++ redis/commands/json/__init__.py | 27 +++++++++++++++++++++++ redis/commands/json/commands.py | 18 +++++++++++++++ redis/commands/timeseries/__init__.py | 41 ++++++++++++++++++++++++++++------- tests/test_json.py | 32 ++++++++++++++++++--------- tests/test_timeseries.py | 31 +++++++++++--------------- tox.ini | 11 ++++++++-- 7 files changed, 154 insertions(+), 38 deletions(-) diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py index 2a4298c..22cb622 100644 --- a/redis/commands/helpers.py +++ b/redis/commands/helpers.py @@ -1,3 +1,7 @@ +import random +import string + + def list_or_args(keys, args): # returns a single new list combining keys and args try: @@ -42,3 +46,31 @@ def parse_to_list(response): except TypeError: res.append(None) return res + + +def random_string(length=10): + """ + Returns a random N character long string. + """ + return "".join( # nosec + random.choice(string.ascii_lowercase) for x in range(length) + ) + + +def quote_string(v): + """ + RedisGraph strings must be quoted, + quote_string wraps given v with quotes incase + v is a string. + """ + + if isinstance(v, bytes): + v = v.decode() + elif not isinstance(v, str): + return v + if len(v) == 0: + return '""' + + v = v.replace('"', '\\"') + + return '"{}"'.format(v) diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index d00627e..d634dbd 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -6,6 +6,7 @@ from .decoders import ( ) from ..helpers import nativestr from .commands import JSONCommands +import redis class JSON(JSONCommands): @@ -91,3 +92,29 @@ class JSON(JSONCommands): def _encode(self, obj): """Get the encoder.""" return self.__encoder__.encode(obj) + + def pipeline(self, transaction=True, shard_hint=None): + """Creates a pipeline for the JSON module, that can be used for executing + JSON commands, as well as classic core commands. + + Usage example: + + r = redis.Redis() + pipe = r.json().pipeline() + pipe.jsonset('foo', '.', {'hello!': 'world'}) + pipe.jsonget('foo') + pipe.jsonget('notakey') + """ + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + p._encode = self._encode + p._decode = self._decode + return p + + +class Pipeline(JSONCommands, redis.client.Pipeline): + """Pipeline for the module.""" diff --git a/redis/commands/json/commands.py b/redis/commands/json/commands.py index 716741c..4436f6a 100644 --- a/redis/commands/json/commands.py +++ b/redis/commands/json/commands.py @@ -154,6 +154,9 @@ class JSONCommands: ``xx`` if set to True, set ``value`` only if it exists. ``decode_keys`` If set to True, the keys of ``obj`` will be decoded with utf-8. + + For the purpose of using this within a pipeline, this command is also + aliased to jsonset. """ if decode_keys: obj = decode_dict_keys(obj) @@ -212,3 +215,18 @@ class JSONCommands: pieces.append(key) pieces.append(str(path)) return self.execute_command("JSON.DEBUG", *pieces) + + @deprecated(version='4.0.0', + reason='redisjson-py supported this, call get directly.') + def jsonget(self, *args, **kwargs): + return self.get(*args, **kwargs) + + @deprecated(version='4.0.0', + reason='redisjson-py supported this, call get directly.') + def jsonmget(self, *args, **kwargs): + return self.mget(*args, **kwargs) + + @deprecated(version='4.0.0', + reason='redisjson-py supported this, call get directly.') + def jsonset(self, *args, **kwargs): + return self.set(*args, **kwargs) diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index db9c3a5..83fa170 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -1,4 +1,4 @@ -from redis.client import bool_ok +import redis.client from .utils import ( parse_range, @@ -37,12 +37,12 @@ class TimeSeries(TimeSeriesCommands): def __init__(self, client=None, version=None, **kwargs): """Create a new RedisTimeSeries client.""" # Set the module commands' callbacks - MODULE_CALLBACKS = { - CREATE_CMD: bool_ok, - ALTER_CMD: bool_ok, - CREATERULE_CMD: bool_ok, + self.MODULE_CALLBACKS = { + CREATE_CMD: redis.client.bool_ok, + ALTER_CMD: redis.client.bool_ok, + CREATERULE_CMD: redis.client.bool_ok, DEL_CMD: int, - DELETERULE_CMD: bool_ok, + DELETERULE_CMD: redis.client.bool_ok, RANGE_CMD: parse_range, REVRANGE_CMD: parse_range, MRANGE_CMD: parse_m_range, @@ -57,5 +57,30 @@ class TimeSeries(TimeSeriesCommands): self.execute_command = client.execute_command self.MODULE_VERSION = version - for k in MODULE_CALLBACKS: - self.client.set_response_callback(k, MODULE_CALLBACKS[k]) + for key, value in self.MODULE_CALLBACKS.items(): + self.client.set_response_callback(key, value) + + def pipeline(self, transaction=True, shard_hint=None): + """Creates a pipeline for the TimeSeries module, that can be used + for executing only TimeSeries commands and core commands. + + Usage example: + + r = redis.Redis() + pipe = r.ts().pipeline() + for i in range(100): + pipeline.add("with_pipeline", i, 1.1 * i) + pipeline.execute() + + """ + p = Pipeline( + connection_pool=self.client.connection_pool, + response_callbacks=self.MODULE_CALLBACKS, + transaction=transaction, + shard_hint=shard_hint, + ) + return p + + +class Pipeline(TimeSeriesCommands, redis.client.Pipeline): + """Pipeline for the module.""" diff --git a/tests/test_json.py b/tests/test_json.py index 19b0c32..b3f38f7 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -275,16 +275,28 @@ def test_objlen(client): assert len(obj) == client.json().objlen("obj") -# @pytest.mark.pipeline -# @pytest.mark.redismod -# def test_pipelineshouldsucceed(client): -# p = client.json().pipeline() -# p.set("foo", Path.rootPath(), "bar") -# p.get("foo") -# p.delete("foo") -# assert [True, "bar", 1] == p.execute() -# assert client.keys() == [] -# assert client.get("foo") is None +@pytest.mark.pipeline +@pytest.mark.redismod +def test_json_commands_in_pipeline(client): + p = client.json().pipeline() + p.set("foo", Path.rootPath(), "bar") + p.get("foo") + p.delete("foo") + assert [True, "bar", 1] == p.execute() + assert client.keys() == [] + assert client.get("foo") is None + + # now with a true, json object + client.flushdb() + p = client.json().pipeline() + d = {"hello": "world", "oh": "snap"} + p.jsonset("foo", Path.rootPath(), d) + p.jsonget("foo") + p.exists("notarealkey") + p.delete("foo") + assert [True, d, 0, 1] == p.execute() + assert client.keys() == [] + assert client.get("foo") is None @pytest.mark.redismod diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index b2df3fe..d3d474f 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -49,10 +49,6 @@ def testAlter(client): assert 10 == client.ts().info(1).retention_msecs -# pipe = client.ts().pipeline() -# assert pipe.create(2) - - @pytest.mark.redismod @skip_ifmodversion_lt("1.4.0", "timeseries") def testAlterDiplicatePolicy(client): @@ -568,20 +564,19 @@ def testQueryIndex(client): assert [2] == client.ts().queryindex(["Taste=That"]) -# -# @pytest.mark.redismod -# @pytest.mark.pipeline -# def testPipeline(client): -# pipeline = client.ts().pipeline() -# pipeline.create("with_pipeline") -# for i in range(100): -# pipeline.add("with_pipeline", i, 1.1 * i) -# pipeline.execute() - -# info = client.ts().info("with_pipeline") -# assert info.lastTimeStamp == 99 -# assert info.total_samples == 100 -# assert client.ts().get("with_pipeline")[1] == 99 * 1.1 +@pytest.mark.redismod +@pytest.mark.pipeline +def test_pipeline(client): + pipeline = client.ts().pipeline() + pipeline.create("with_pipeline") + for i in range(100): + pipeline.add("with_pipeline", i, 1.1 * i) + pipeline.execute() + + info = client.ts().info("with_pipeline") + assert info.lastTimeStamp == 99 + assert info.total_samples == 100 + assert client.ts().get("with_pipeline")[1] == 99 * 1.1 @pytest.mark.redismod diff --git a/tox.ini b/tox.ini index f09b3a8..1d4da08 100644 --- a/tox.ini +++ b/tox.ini @@ -121,6 +121,13 @@ basepython = pypy3 [flake8] exclude = - .venv, + *.egg-info, + *.pyc, + .git, .tox, - whitelist.py + .venv*, + build, + dist, + docker, + venv*, + whitelist.py \ No newline at end of file -- cgit v1.2.1