summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChayim I. Kirshen <c@kirshen.com>2021-11-09 10:57:20 +0200
committerChayim I. Kirshen <c@kirshen.com>2021-11-09 10:57:20 +0200
commit0e73a4e510b62026164f85e35997e0d1e913e3a4 (patch)
treec41aa89b1a6d2aa3aaf7684b36b4a932c93881bd
parentbba75187931af84dd21c91bcf1b3bd422c9aed72 (diff)
downloadredis-py-ck-jsonts-pipelines.tar.gz
Migrating pipeline support for JSON and TSck-jsonts-pipelines
-rw-r--r--redis/commands/helpers.py32
-rw-r--r--redis/commands/json/__init__.py27
-rw-r--r--redis/commands/json/commands.py18
-rw-r--r--redis/commands/timeseries/__init__.py41
-rw-r--r--tests/test_json.py32
-rw-r--r--tests/test_timeseries.py31
-rw-r--r--tox.ini11
7 files changed, 154 insertions, 38 deletions
diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py
index 2a4298c..22cb622 100644
--- a/redis/commands/helpers.py
+++ b/redis/commands/helpers.py
@@ -1,3 +1,7 @@
+import random
+import string
+
+
def list_or_args(keys, args):
# returns a single new list combining keys and args
try:
@@ -42,3 +46,31 @@ def parse_to_list(response):
except TypeError:
res.append(None)
return res
+
+
+def random_string(length=10):
+ """
+ Returns a random N character long string.
+ """
+ return "".join( # nosec
+ random.choice(string.ascii_lowercase) for x in range(length)
+ )
+
+
+def quote_string(v):
+ """
+ RedisGraph strings must be quoted,
+ quote_string wraps given v with quotes incase
+ v is a string.
+ """
+
+ if isinstance(v, bytes):
+ v = v.decode()
+ elif not isinstance(v, str):
+ return v
+ if len(v) == 0:
+ return '""'
+
+ v = v.replace('"', '\\"')
+
+ return '"{}"'.format(v)
diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py
index d00627e..d634dbd 100644
--- a/redis/commands/json/__init__.py
+++ b/redis/commands/json/__init__.py
@@ -6,6 +6,7 @@ from .decoders import (
)
from ..helpers import nativestr
from .commands import JSONCommands
+import redis
class JSON(JSONCommands):
@@ -91,3 +92,29 @@ class JSON(JSONCommands):
def _encode(self, obj):
"""Get the encoder."""
return self.__encoder__.encode(obj)
+
+ def pipeline(self, transaction=True, shard_hint=None):
+ """Creates a pipeline for the JSON module, that can be used for executing
+ JSON commands, as well as classic core commands.
+
+ Usage example:
+
+ r = redis.Redis()
+ pipe = r.json().pipeline()
+ pipe.jsonset('foo', '.', {'hello!': 'world'})
+ pipe.jsonget('foo')
+ pipe.jsonget('notakey')
+ """
+ p = Pipeline(
+ connection_pool=self.client.connection_pool,
+ response_callbacks=self.MODULE_CALLBACKS,
+ transaction=transaction,
+ shard_hint=shard_hint,
+ )
+ p._encode = self._encode
+ p._decode = self._decode
+ return p
+
+
+class Pipeline(JSONCommands, redis.client.Pipeline):
+ """Pipeline for the module."""
diff --git a/redis/commands/json/commands.py b/redis/commands/json/commands.py
index 716741c..4436f6a 100644
--- a/redis/commands/json/commands.py
+++ b/redis/commands/json/commands.py
@@ -154,6 +154,9 @@ class JSONCommands:
``xx`` if set to True, set ``value`` only if it exists.
``decode_keys`` If set to True, the keys of ``obj`` will be decoded
with utf-8.
+
+ For the purpose of using this within a pipeline, this command is also
+ aliased to jsonset.
"""
if decode_keys:
obj = decode_dict_keys(obj)
@@ -212,3 +215,18 @@ class JSONCommands:
pieces.append(key)
pieces.append(str(path))
return self.execute_command("JSON.DEBUG", *pieces)
+
+ @deprecated(version='4.0.0',
+ reason='redisjson-py supported this, call get directly.')
+ def jsonget(self, *args, **kwargs):
+ return self.get(*args, **kwargs)
+
+ @deprecated(version='4.0.0',
+ reason='redisjson-py supported this, call get directly.')
+ def jsonmget(self, *args, **kwargs):
+ return self.mget(*args, **kwargs)
+
+ @deprecated(version='4.0.0',
+ reason='redisjson-py supported this, call get directly.')
+ def jsonset(self, *args, **kwargs):
+ return self.set(*args, **kwargs)
diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py
index db9c3a5..83fa170 100644
--- a/redis/commands/timeseries/__init__.py
+++ b/redis/commands/timeseries/__init__.py
@@ -1,4 +1,4 @@
-from redis.client import bool_ok
+import redis.client
from .utils import (
parse_range,
@@ -37,12 +37,12 @@ class TimeSeries(TimeSeriesCommands):
def __init__(self, client=None, version=None, **kwargs):
"""Create a new RedisTimeSeries client."""
# Set the module commands' callbacks
- MODULE_CALLBACKS = {
- CREATE_CMD: bool_ok,
- ALTER_CMD: bool_ok,
- CREATERULE_CMD: bool_ok,
+ self.MODULE_CALLBACKS = {
+ CREATE_CMD: redis.client.bool_ok,
+ ALTER_CMD: redis.client.bool_ok,
+ CREATERULE_CMD: redis.client.bool_ok,
DEL_CMD: int,
- DELETERULE_CMD: bool_ok,
+ DELETERULE_CMD: redis.client.bool_ok,
RANGE_CMD: parse_range,
REVRANGE_CMD: parse_range,
MRANGE_CMD: parse_m_range,
@@ -57,5 +57,30 @@ class TimeSeries(TimeSeriesCommands):
self.execute_command = client.execute_command
self.MODULE_VERSION = version
- for k in MODULE_CALLBACKS:
- self.client.set_response_callback(k, MODULE_CALLBACKS[k])
+ for key, value in self.MODULE_CALLBACKS.items():
+ self.client.set_response_callback(key, value)
+
+ def pipeline(self, transaction=True, shard_hint=None):
+ """Creates a pipeline for the TimeSeries module, that can be used
+ for executing only TimeSeries commands and core commands.
+
+ Usage example:
+
+ r = redis.Redis()
+ pipe = r.ts().pipeline()
+ for i in range(100):
+ pipeline.add("with_pipeline", i, 1.1 * i)
+ pipeline.execute()
+
+ """
+ p = Pipeline(
+ connection_pool=self.client.connection_pool,
+ response_callbacks=self.MODULE_CALLBACKS,
+ transaction=transaction,
+ shard_hint=shard_hint,
+ )
+ return p
+
+
+class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
+ """Pipeline for the module."""
diff --git a/tests/test_json.py b/tests/test_json.py
index 19b0c32..b3f38f7 100644
--- a/tests/test_json.py
+++ b/tests/test_json.py
@@ -275,16 +275,28 @@ def test_objlen(client):
assert len(obj) == client.json().objlen("obj")
-# @pytest.mark.pipeline
-# @pytest.mark.redismod
-# def test_pipelineshouldsucceed(client):
-# p = client.json().pipeline()
-# p.set("foo", Path.rootPath(), "bar")
-# p.get("foo")
-# p.delete("foo")
-# assert [True, "bar", 1] == p.execute()
-# assert client.keys() == []
-# assert client.get("foo") is None
+@pytest.mark.pipeline
+@pytest.mark.redismod
+def test_json_commands_in_pipeline(client):
+ p = client.json().pipeline()
+ p.set("foo", Path.rootPath(), "bar")
+ p.get("foo")
+ p.delete("foo")
+ assert [True, "bar", 1] == p.execute()
+ assert client.keys() == []
+ assert client.get("foo") is None
+
+ # now with a true, json object
+ client.flushdb()
+ p = client.json().pipeline()
+ d = {"hello": "world", "oh": "snap"}
+ p.jsonset("foo", Path.rootPath(), d)
+ p.jsonget("foo")
+ p.exists("notarealkey")
+ p.delete("foo")
+ assert [True, d, 0, 1] == p.execute()
+ assert client.keys() == []
+ assert client.get("foo") is None
@pytest.mark.redismod
diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py
index b2df3fe..d3d474f 100644
--- a/tests/test_timeseries.py
+++ b/tests/test_timeseries.py
@@ -49,10 +49,6 @@ def testAlter(client):
assert 10 == client.ts().info(1).retention_msecs
-# pipe = client.ts().pipeline()
-# assert pipe.create(2)
-
-
@pytest.mark.redismod
@skip_ifmodversion_lt("1.4.0", "timeseries")
def testAlterDiplicatePolicy(client):
@@ -568,20 +564,19 @@ def testQueryIndex(client):
assert [2] == client.ts().queryindex(["Taste=That"])
-#
-# @pytest.mark.redismod
-# @pytest.mark.pipeline
-# def testPipeline(client):
-# pipeline = client.ts().pipeline()
-# pipeline.create("with_pipeline")
-# for i in range(100):
-# pipeline.add("with_pipeline", i, 1.1 * i)
-# pipeline.execute()
-
-# info = client.ts().info("with_pipeline")
-# assert info.lastTimeStamp == 99
-# assert info.total_samples == 100
-# assert client.ts().get("with_pipeline")[1] == 99 * 1.1
+@pytest.mark.redismod
+@pytest.mark.pipeline
+def test_pipeline(client):
+ pipeline = client.ts().pipeline()
+ pipeline.create("with_pipeline")
+ for i in range(100):
+ pipeline.add("with_pipeline", i, 1.1 * i)
+ pipeline.execute()
+
+ info = client.ts().info("with_pipeline")
+ assert info.lastTimeStamp == 99
+ assert info.total_samples == 100
+ assert client.ts().get("with_pipeline")[1] == 99 * 1.1
@pytest.mark.redismod
diff --git a/tox.ini b/tox.ini
index f09b3a8..1d4da08 100644
--- a/tox.ini
+++ b/tox.ini
@@ -121,6 +121,13 @@ basepython = pypy3
[flake8]
exclude =
- .venv,
+ *.egg-info,
+ *.pyc,
+ .git,
.tox,
- whitelist.py
+ .venv*,
+ build,
+ dist,
+ docker,
+ venv*,
+ whitelist.py \ No newline at end of file