diff options
author | Chayim <chayim@users.noreply.github.com> | 2021-10-25 17:06:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 17:06:04 +0300 |
commit | 3946da29d7e451a20289fb6e282516fa24e402af (patch) | |
tree | 25cf4b73b4e00d66c75288790616ea882823e2b7 /redis | |
parent | 0ef4c0711693b4b313ce97261214bd151d8261d5 (diff) | |
download | redis-py-3946da29d7e451a20289fb6e282516fa24e402af.tar.gz |
redisjson support (#1636)
Diffstat (limited to 'redis')
-rwxr-xr-x | redis/client.py | 48 | ||||
-rw-r--r-- | redis/commands/__init__.py | 11 | ||||
-rw-r--r-- | redis/commands/core.py (renamed from redis/commands.py) | 124 | ||||
-rw-r--r-- | redis/commands/helpers.py | 25 | ||||
-rw-r--r-- | redis/commands/json/__init__.py | 84 | ||||
-rw-r--r-- | redis/commands/json/commands.py | 197 | ||||
-rw-r--r-- | redis/commands/json/helpers.py | 25 | ||||
-rw-r--r-- | redis/commands/json/path.py | 21 | ||||
-rw-r--r-- | redis/commands/redismodules.py | 17 | ||||
-rw-r--r-- | redis/commands/sentinel.py | 97 |
10 files changed, 526 insertions, 123 deletions
diff --git a/redis/client.py b/redis/client.py index a6bd183..4db9887 100755 --- a/redis/client.py +++ b/redis/client.py @@ -6,10 +6,7 @@ import re import threading import time import warnings -from redis.commands import ( - list_or_args, - Commands -) +from redis.commands import CoreCommands, RedisModuleCommands, list_or_args from redis.connection import (ConnectionPool, UnixDomainSocketConnection, SSLConnection) from redis.lock import Lock @@ -609,7 +606,7 @@ def parse_set_result(response, **options): return response and str_if_bytes(response) == 'OK' -class Redis(Commands, object): +class Redis(RedisModuleCommands, CoreCommands, object): """ Implementation of the Redis protocol. @@ -898,6 +895,47 @@ class Redis(Commands, object): "Set a custom Response Callback" self.response_callbacks[command] = callback + def load_external_module(self, modname, funcname, func): + """ + This function can be used to add externally defined redis modules, + and their namespaces to the redis client. + modname - A string containing the name of the redis module to look for + in the redis info block. + funcname - A string containing the name of the function to create + func - The function, being added to this class. + + ex: Assume that one has a custom redis module named foomod that + creates command named 'foo.dothing' and 'foo.anotherthing' in redis. + To load function functions into this namespace: + + from redis import Redis + from foomodule import F + r = Redis() + r.load_external_module("foomod", "foo", F) + r.foo().dothing('your', 'arguments') + + For a concrete example see the reimport of the redisjson module in + tests/test_connection.py::test_loading_external_modules + """ + mods = self.loaded_modules + if modname.lower() not in mods: + raise ModuleError("{} is not loaded in redis.".format(modname)) + setattr(self, funcname, func) + + @property + def loaded_modules(self): + key = '__redis_modules__' + mods = getattr(self, key, None) + if mods is not None: + return mods + + try: + mods = [f.get('name').lower() for f in self.info().get('modules')] + except TypeError: + mods = [] + setattr(self, key, mods) + return mods + def pipeline(self, transaction=True, shard_hint=None): """ Return a new pipeline object that can queue multiple commands for diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py new file mode 100644 index 0000000..f1ddaaa --- /dev/null +++ b/redis/commands/__init__.py @@ -0,0 +1,11 @@ +from .core import CoreCommands +from .redismodules import RedisModuleCommands +from .helpers import list_or_args +from .sentinel import SentinelCommands + +__all__ = [ + 'CoreCommands', + 'RedisModuleCommands', + 'SentinelCommands', + 'list_or_args' +] diff --git a/redis/commands.py b/redis/commands/core.py index 2697e78..6512b45 100644 --- a/redis/commands.py +++ b/redis/commands/core.py @@ -3,6 +3,7 @@ import time import warnings import hashlib +from .helpers import list_or_args from redis.exceptions import ( ConnectionError, DataError, @@ -11,24 +12,7 @@ from redis.exceptions import ( ) -def list_or_args(keys, args): - # returns a single new list combining keys and args - try: - iter(keys) - # a string or bytes instance can be iterated, but indicates - # keys wasn't passed as a list - if isinstance(keys, (bytes, str)): - keys = [keys] - else: - keys = list(keys) - except TypeError: - keys = [keys] - if args: - keys.extend(args) - return keys - - -class Commands: +class CoreCommands: """ A class containing all of the implemented redis commands. This class is to be used as a mixin. @@ -1173,16 +1157,16 @@ class Commands: if ex is not None: pieces.append('EX') if isinstance(ex, datetime.timedelta): - ex = int(ex.total_seconds()) - if isinstance(ex, int): + pieces.append(int(ex.total_seconds())) + elif isinstance(ex, int): pieces.append(ex) else: raise DataError("ex must be datetime.timedelta or int") if px is not None: pieces.append('PX') if isinstance(px, datetime.timedelta): - px = int(px.total_seconds() * 1000) - if isinstance(px, int): + pieces.append(int(px.total_seconds() * 1000)) + elif isinstance(px, int): pieces.append(px) else: raise DataError("px must be datetime.timedelta or int") @@ -3413,99 +3397,3 @@ class BitFieldOperation: command = self.command self.reset() return self.client.execute_command(*command) - - -class SentinelCommands: - """ - A class containing the commands specific to redis sentinal. This class is - to be used as a mixin. - """ - - def sentinel(self, *args): - "Redis Sentinel's SENTINEL command." - warnings.warn( - DeprecationWarning('Use the individual sentinel_* methods')) - - def sentinel_get_master_addr_by_name(self, service_name): - "Returns a (host, port) pair for the given ``service_name``" - return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME', - service_name) - - def sentinel_master(self, service_name): - "Returns a dictionary containing the specified masters state." - return self.execute_command('SENTINEL MASTER', service_name) - - def sentinel_masters(self): - "Returns a list of dictionaries containing each master's state." - return self.execute_command('SENTINEL MASTERS') - - def sentinel_monitor(self, name, ip, port, quorum): - "Add a new master to Sentinel to be monitored" - return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum) - - def sentinel_remove(self, name): - "Remove a master from Sentinel's monitoring" - return self.execute_command('SENTINEL REMOVE', name) - - def sentinel_sentinels(self, service_name): - "Returns a list of sentinels for ``service_name``" - return self.execute_command('SENTINEL SENTINELS', service_name) - - def sentinel_set(self, name, option, value): - "Set Sentinel monitoring parameters for a given master" - return self.execute_command('SENTINEL SET', name, option, value) - - def sentinel_slaves(self, service_name): - "Returns a list of slaves for ``service_name``" - return self.execute_command('SENTINEL SLAVES', service_name) - - def sentinel_reset(self, pattern): - """ - This command will reset all the masters with matching name. - The pattern argument is a glob-style pattern. - - The reset process clears any previous state in a master (including a - failover in progress), and removes every slave and sentinel already - discovered and associated with the master. - """ - return self.execute_command('SENTINEL RESET', pattern, once=True) - - def sentinel_failover(self, new_master_name): - """ - Force a failover as if the master was not reachable, and without - asking for agreement to other Sentinels (however a new version of the - configuration will be published so that the other Sentinels will - update their configurations). - """ - return self.execute_command('SENTINEL FAILOVER', new_master_name) - - def sentinel_ckquorum(self, new_master_name): - """ - Check if the current Sentinel configuration is able to reach the - quorum needed to failover a master, and the majority needed to - authorize the failover. - - This command should be used in monitoring systems to check if a - Sentinel deployment is ok. - """ - return self.execute_command('SENTINEL CKQUORUM', - new_master_name, - once=True) - - def sentinel_flushconfig(self): - """ - Force Sentinel to rewrite its configuration on disk, including the - current Sentinel state. - - Normally Sentinel rewrites the configuration every time something - changes in its state (in the context of the subset of the state which - is persisted on disk across restart). - However sometimes it is possible that the configuration file is lost - because of operation errors, disk failures, package upgrade scripts or - configuration managers. In those cases a way to to force Sentinel to - rewrite the configuration file is handy. - - This command works even if the previous configuration file is - completely missing. - """ - return self.execute_command('SENTINEL FLUSHCONFIG') diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py new file mode 100644 index 0000000..b012621 --- /dev/null +++ b/redis/commands/helpers.py @@ -0,0 +1,25 @@ +def list_or_args(keys, args): + # returns a single new list combining keys and args + try: + iter(keys) + # a string or bytes instance can be iterated, but indicates + # keys wasn't passed as a list + if isinstance(keys, (bytes, str)): + keys = [keys] + else: + keys = list(keys) + except TypeError: + keys = [keys] + if args: + keys.extend(args) + return keys + + +def nativestr(x): + """Return the decoded binary string, or a string, depending on type.""" + return x.decode("utf-8", "replace") if isinstance(x, bytes) else x + + +def delist(x): + """Given a list of binaries, return the stringified version.""" + return [nativestr(obj) for obj in x] diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py new file mode 100644 index 0000000..92f1199 --- /dev/null +++ b/redis/commands/json/__init__.py @@ -0,0 +1,84 @@ +# from typing import Optional +from json import JSONDecoder, JSONEncoder + +# # from redis.client import Redis + +from .helpers import bulk_of_jsons +from ..helpers import nativestr, delist +from .commands import JSONCommands +# from ..feature import AbstractFeature + + +class JSON(JSONCommands): + """ + Create a client for talking to json. + + :param decoder: + :type json.JSONDecoder: An instance of json.JSONDecoder + + :param encoder: + :type json.JSONEncoder: An instance of json.JSONEncoder + """ + + def __init__( + self, + client, + decoder=JSONDecoder(), + encoder=JSONEncoder(), + ): + """ + Create a client for talking to json. + + :param decoder: + :type json.JSONDecoder: An instance of json.JSONDecoder + + :param encoder: + :type json.JSONEncoder: An instance of json.JSONEncoder + """ + # Set the module commands' callbacks + self.MODULE_CALLBACKS = { + "JSON.CLEAR": int, + "JSON.DEL": int, + "JSON.FORGET": int, + "JSON.GET": self._decode, + "JSON.MGET": bulk_of_jsons(self._decode), + "JSON.SET": lambda r: r and nativestr(r) == "OK", + "JSON.NUMINCRBY": self._decode, + "JSON.NUMMULTBY": self._decode, + "JSON.TOGGLE": lambda b: b == b"true", + "JSON.STRAPPEND": int, + "JSON.STRLEN": int, + "JSON.ARRAPPEND": int, + "JSON.ARRINDEX": int, + "JSON.ARRINSERT": int, + "JSON.ARRLEN": int, + "JSON.ARRPOP": self._decode, + "JSON.ARRTRIM": int, + "JSON.OBJLEN": int, + "JSON.OBJKEYS": delist, + # "JSON.RESP": delist, + "JSON.DEBUG": int, + } + + self.client = client + self.execute_command = client.execute_command + + for key, value in self.MODULE_CALLBACKS.items(): + self.client.set_response_callback(key, value) + + self.__encoder__ = encoder + self.__decoder__ = decoder + + def _decode(self, obj): + """Get the decoder.""" + if obj is None: + return obj + + try: + return self.__decoder__.decode(obj) + except TypeError: + return self.__decoder__.decode(obj.decode()) + + def _encode(self, obj): + """Get the encoder.""" + return self.__encoder__.encode(obj) diff --git a/redis/commands/json/commands.py b/redis/commands/json/commands.py new file mode 100644 index 0000000..2f8039f --- /dev/null +++ b/redis/commands/json/commands.py @@ -0,0 +1,197 @@ +from .path import Path, str_path +from .helpers import decode_dict_keys + + +class JSONCommands: + """json commands.""" + + def arrappend(self, name, path=Path.rootPath(), *args): + """Append the objects ``args`` to the array under the + ``path` in key ``name``. + """ + pieces = [name, str_path(path)] + for o in args: + pieces.append(self._encode(o)) + return self.execute_command("JSON.ARRAPPEND", *pieces) + + def arrindex(self, name, path, scalar, start=0, stop=-1): + """ + Return the index of ``scalar`` in the JSON array under ``path`` at key + ``name``. + + The search can be limited using the optional inclusive ``start`` + and exclusive ``stop`` indices. + """ + return self.execute_command( + "JSON.ARRINDEX", name, str_path(path), self._encode(scalar), + start, stop + ) + + def arrinsert(self, name, path, index, *args): + """Insert the objects ``args`` to the array at index ``index`` + under the ``path` in key ``name``. + """ + pieces = [name, str_path(path), index] + for o in args: + pieces.append(self._encode(o)) + return self.execute_command("JSON.ARRINSERT", *pieces) + + def forget(self, name, path=Path.rootPath()): + """Alias for jsondel (delete the JSON value).""" + return self.execute_command("JSON.FORGET", name, str_path(path)) + + def arrlen(self, name, path=Path.rootPath()): + """Return the length of the array JSON value under ``path`` + at key``name``. + """ + return self.execute_command("JSON.ARRLEN", name, str_path(path)) + + def arrpop(self, name, path=Path.rootPath(), index=-1): + """Pop the element at ``index`` in the array JSON value under + ``path`` at key ``name``. + """ + return self.execute_command("JSON.ARRPOP", name, str_path(path), index) + + def arrtrim(self, name, path, start, stop): + """Trim the array JSON value under ``path`` at key ``name`` to the + inclusive range given by ``start`` and ``stop``. + """ + return self.execute_command("JSON.ARRTRIM", name, str_path(path), + start, stop) + + def type(self, name, path=Path.rootPath()): + """Get the type of the JSON value under ``path`` from key ``name``.""" + return self.execute_command("JSON.TYPE", name, str_path(path)) + + def resp(self, name, path=Path.rootPath()): + """Return the JSON value under ``path`` at key ``name``.""" + return self.execute_command("JSON.RESP", name, str_path(path)) + + def objkeys(self, name, path=Path.rootPath()): + """Return the key names in the dictionary JSON value under ``path`` at + key ``name``.""" + return self.execute_command("JSON.OBJKEYS", name, str_path(path)) + + def objlen(self, name, path=Path.rootPath()): + """Return the length of the dictionary JSON value under ``path`` at key + ``name``. + """ + return self.execute_command("JSON.OBJLEN", name, str_path(path)) + + def numincrby(self, name, path, number): + """Increment the numeric (integer or floating point) JSON value under + ``path`` at key ``name`` by the provided ``number``. + """ + return self.execute_command( + "JSON.NUMINCRBY", name, str_path(path), self._encode(number) + ) + + def nummultby(self, name, path, number): + """Multiply the numeric (integer or floating point) JSON value under + ``path`` at key ``name`` with the provided ``number``. + """ + return self.execute_command( + "JSON.NUMMULTBY", name, str_path(path), self._encode(number) + ) + + def clear(self, name, path=Path.rootPath()): + """ + Empty arrays and objects (to have zero slots/keys without deleting the + array/object). + + Return the count of cleared paths (ignoring non-array and non-objects + paths). + """ + return self.execute_command("JSON.CLEAR", name, str_path(path)) + + def delete(self, name, path=Path.rootPath()): + """Delete the JSON value stored at key ``name`` under ``path``.""" + return self.execute_command("JSON.DEL", name, str_path(path)) + + def get(self, name, *args, no_escape=False): + """ + Get the object stored as a JSON value at key ``name``. + + ``args`` is zero or more paths, and defaults to root path + ```no_escape`` is a boolean flag to add no_escape option to get + non-ascii characters + """ + pieces = [name] + if no_escape: + pieces.append("noescape") + + if len(args) == 0: + pieces.append(Path.rootPath()) + + else: + for p in args: + pieces.append(str_path(p)) + + # Handle case where key doesn't exist. The JSONDecoder would raise a + # TypeError exception since it can't decode None + try: + return self.execute_command("JSON.GET", *pieces) + except TypeError: + return None + + def mget(self, path, *args): + """Get the objects stored as a JSON values under ``path`` from keys + ``args``. + """ + pieces = [] + pieces.extend(args) + pieces.append(str_path(path)) + return self.execute_command("JSON.MGET", *pieces) + + def set(self, name, path, obj, nx=False, xx=False, decode_keys=False): + """ + Set the JSON value at key ``name`` under the ``path`` to ``obj``. + + ``nx`` if set to True, set ``value`` only if it does not exist. + ``xx`` if set to True, set ``value`` only if it exists. + ``decode_keys`` If set to True, the keys of ``obj`` will be decoded + with utf-8. + """ + if decode_keys: + obj = decode_dict_keys(obj) + + pieces = [name, str_path(path), self._encode(obj)] + + # Handle existential modifiers + if nx and xx: + raise Exception( + "nx and xx are mutually exclusive: use one, the " + "other or neither - but not both" + ) + elif nx: + pieces.append("NX") + elif xx: + pieces.append("XX") + return self.execute_command("JSON.SET", *pieces) + + def strlen(self, name, path=Path.rootPath()): + """Return the length of the string JSON value under ``path`` at key + ``name``. + """ + return self.execute_command("JSON.STRLEN", name, str_path(path)) + + def toggle(self, name, path=Path.rootPath()): + """Toggle boolean value under ``path`` at key ``name``. + returning the new value. + """ + return self.execute_command("JSON.TOGGLE", name, str_path(path)) + + def strappend(self, name, string, path=Path.rootPath()): + """Append to the string JSON value under ``path`` at key ``name`` + the provided ``string``. + """ + return self.execute_command( + "JSON.STRAPPEND", name, str_path(path), self._encode(string) + ) + + def debug(self, name, path=Path.rootPath()): + """Return the memory usage in bytes of a value under ``path`` from + key ``name``. + """ + return self.execute_command("JSON.DEBUG", "MEMORY", + name, str_path(path)) diff --git a/redis/commands/json/helpers.py b/redis/commands/json/helpers.py new file mode 100644 index 0000000..8fb20d9 --- /dev/null +++ b/redis/commands/json/helpers.py @@ -0,0 +1,25 @@ +import copy + + +def bulk_of_jsons(d): + """Replace serialized JSON values with objects in a + bulk array response (list). + """ + + def _f(b): + for index, item in enumerate(b): + if item is not None: + b[index] = d(item) + return b + + return _f + + +def decode_dict_keys(obj): + """Decode the keys of the given dictionary with utf-8.""" + newobj = copy.copy(obj) + for k in obj.keys(): + if isinstance(k, bytes): + newobj[k.decode("utf-8")] = newobj[k] + newobj.pop(k) + return newobj diff --git a/redis/commands/json/path.py b/redis/commands/json/path.py new file mode 100644 index 0000000..dff8648 --- /dev/null +++ b/redis/commands/json/path.py @@ -0,0 +1,21 @@ +def str_path(p): + """Return the string representation of a path if it is of class Path.""" + if isinstance(p, Path): + return p.strPath + else: + return p + + +class Path(object): + """This class represents a path in a JSON value.""" + + strPath = "" + + @staticmethod + def rootPath(): + """Return the root path's string representation.""" + return "." + + def __init__(self, path): + """Make a new path based on the string representation in `path`.""" + self.strPath = path diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py new file mode 100644 index 0000000..fb53107 --- /dev/null +++ b/redis/commands/redismodules.py @@ -0,0 +1,17 @@ +from json import JSONEncoder, JSONDecoder +from redis.exceptions import ModuleError + + +class RedisModuleCommands: + """This class contains the wrapper functions to bring supported redis + modules into the command namepsace. + """ + + def json(self, encoder=JSONEncoder(), decoder=JSONDecoder()): + """Access the json namespace, providing support for redis json.""" + if 'rejson' not in self.loaded_modules: + raise ModuleError("rejson is not a loaded in the redis instance.") + + from .json import JSON + jj = JSON(client=self, encoder=encoder, decoder=decoder) + return jj diff --git a/redis/commands/sentinel.py b/redis/commands/sentinel.py new file mode 100644 index 0000000..1f02984 --- /dev/null +++ b/redis/commands/sentinel.py @@ -0,0 +1,97 @@ +import warnings + + +class SentinelCommands: + """ + A class containing the commands specific to redis sentinal. This class is + to be used as a mixin. + """ + + def sentinel(self, *args): + "Redis Sentinel's SENTINEL command." + warnings.warn( + DeprecationWarning('Use the individual sentinel_* methods')) + + def sentinel_get_master_addr_by_name(self, service_name): + "Returns a (host, port) pair for the given ``service_name``" + return self.execute_command('SENTINEL GET-MASTER-ADDR-BY-NAME', + service_name) + + def sentinel_master(self, service_name): + "Returns a dictionary containing the specified masters state." + return self.execute_command('SENTINEL MASTER', service_name) + + def sentinel_masters(self): + "Returns a list of dictionaries containing each master's state." + return self.execute_command('SENTINEL MASTERS') + + def sentinel_monitor(self, name, ip, port, quorum): + "Add a new master to Sentinel to be monitored" + return self.execute_command('SENTINEL MONITOR', name, ip, port, quorum) + + def sentinel_remove(self, name): + "Remove a master from Sentinel's monitoring" + return self.execute_command('SENTINEL REMOVE', name) + + def sentinel_sentinels(self, service_name): + "Returns a list of sentinels for ``service_name``" + return self.execute_command('SENTINEL SENTINELS', service_name) + + def sentinel_set(self, name, option, value): + "Set Sentinel monitoring parameters for a given master" + return self.execute_command('SENTINEL SET', name, option, value) + + def sentinel_slaves(self, service_name): + "Returns a list of slaves for ``service_name``" + return self.execute_command('SENTINEL SLAVES', service_name) + + def sentinel_reset(self, pattern): + """ + This command will reset all the masters with matching name. + The pattern argument is a glob-style pattern. + + The reset process clears any previous state in a master (including a + failover in progress), and removes every slave and sentinel already + discovered and associated with the master. + """ + return self.execute_command('SENTINEL RESET', pattern, once=True) + + def sentinel_failover(self, new_master_name): + """ + Force a failover as if the master was not reachable, and without + asking for agreement to other Sentinels (however a new version of the + configuration will be published so that the other Sentinels will + update their configurations). + """ + return self.execute_command('SENTINEL FAILOVER', new_master_name) + + def sentinel_ckquorum(self, new_master_name): + """ + Check if the current Sentinel configuration is able to reach the + quorum needed to failover a master, and the majority needed to + authorize the failover. + + This command should be used in monitoring systems to check if a + Sentinel deployment is ok. + """ + return self.execute_command('SENTINEL CKQUORUM', + new_master_name, + once=True) + + def sentinel_flushconfig(self): + """ + Force Sentinel to rewrite its configuration on disk, including the + current Sentinel state. + + Normally Sentinel rewrites the configuration every time something + changes in its state (in the context of the subset of the state which + is persisted on disk across restart). + However sometimes it is possible that the configuration file is lost + because of operation errors, disk failures, package upgrade scripts or + configuration managers. In those cases a way to to force Sentinel to + rewrite the configuration file is handy. + + This command works even if the previous configuration file is + completely missing. + """ + return self.execute_command('SENTINEL FLUSHCONFIG') |