diff options
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 101 |
1 files changed, 49 insertions, 52 deletions
diff --git a/redis/client.py b/redis/client.py index dc6693d..9f2907e 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1,7 +1,6 @@ from itertools import chain import copy import datetime -import hashlib import re import threading import time @@ -15,7 +14,6 @@ from redis.exceptions import ( ConnectionError, ExecAbortError, ModuleError, - NoScriptError, PubSubError, RedisError, ResponseError, @@ -27,6 +25,9 @@ from redis.utils import safe_str, str_if_bytes SYM_EMPTY = b'' EMPTY_RESPONSE = 'EMPTY_RESPONSE' +# some responses (ie. dump) are binary, and just meant to never be decoded +NEVER_DECODE = 'NEVER_DECODE' + def timestamp_to_datetime(response): "Converts a unix timestamp to a Python datetime object" @@ -461,6 +462,7 @@ def _parse_node_line(line): line_items = line.split(' ') node_id, addr, flags, master_id, ping, pong, epoch, \ connected = line.split(' ')[:8] + addr = addr.split('@')[0] slots = [sl.split('-') for sl in line_items[8:]] node_dict = { 'node_id': node_id, @@ -476,8 +478,13 @@ def _parse_node_line(line): def parse_cluster_nodes(response, **options): - raw_lines = str_if_bytes(response).splitlines() - return dict(_parse_node_line(line) for line in raw_lines) + """ + @see: https://redis.io/commands/cluster-nodes # string + @see: https://redis.io/commands/cluster-replicas # list of string + """ + if isinstance(response, str): + response = response.splitlines() + return dict(_parse_node_line(str_if_bytes(node)) for node in response) def parse_geosearch_generic(response, **options): @@ -516,6 +523,21 @@ def parse_geosearch_generic(response, **options): ] +def parse_command(response, **options): + commands = {} + for command in response: + cmd_dict = {} + cmd_name = str_if_bytes(command[0]) + cmd_dict['name'] = cmd_name + cmd_dict['arity'] = int(command[1]) + cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]] + cmd_dict['first_key_pos'] = command[3] + cmd_dict['last_key_pos'] = command[4] + cmd_dict['step_count'] = command[5] + commands[cmd_name] = cmd_dict + return commands + + def parse_pubsub_numsub(response, **options): return list(zip(response[0::2], response[1::2])) @@ -607,7 +629,7 @@ def parse_set_result(response, **options): return response and str_if_bytes(response) == 'OK' -class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): +class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): """ Implementation of the Redis protocol. @@ -704,7 +726,10 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): 'CLUSTER SET-CONFIG-EPOCH': bool_ok, 'CLUSTER SETSLOT': bool_ok, 'CLUSTER SLAVES': parse_cluster_nodes, + 'CLUSTER REPLICAS': parse_cluster_nodes, + 'COMMAND': parse_command, 'COMMAND COUNT': int, + 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)), 'CONFIG GET': parse_config_get, 'CONFIG RESETSTAT': bool_ok, 'CONFIG SET': bool_ok, @@ -827,7 +852,7 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None, - retry=None): + retry=None, redis_connect_func=None): """ Initialize a new Redis client. To specify a retry policy, first set `retry_on_timeout` to `True` @@ -855,7 +880,8 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): 'retry': copy.deepcopy(retry), 'max_connections': max_connections, 'health_check_interval': health_check_interval, - 'client_name': client_name + 'client_name': client_name, + 'redis_connect_func': redis_connect_func } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -892,7 +918,7 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): self.__class__.RESPONSE_CALLBACKS) def __repr__(self): - return "%s<%s>" % (type(self).__name__, repr(self.connection_pool)) + return f"{type(self).__name__}<{repr(self.connection_pool)}>" def set_response_callback(self, command, callback): "Set a custom Response Callback" @@ -1081,7 +1107,10 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" try: - response = connection.read_response() + if NEVER_DECODE in options: + response = connection.read_response(disable_decoding=True) + else: + response = connection.read_response() except ResponseError: if EMPTY_RESPONSE in options: return options[EMPTY_RESPONSE] @@ -1112,7 +1141,7 @@ class Monitor: # check that monitor returns 'OK', but don't return it to user response = self.connection.read_response() if not bool_ok(response): - raise RedisError('MONITOR failed: %s' % response) + raise RedisError(f'MONITOR failed: {response}') return self def __exit__(self, *args): @@ -1173,14 +1202,16 @@ class PubSub: HEALTH_CHECK_MESSAGE = 'redis-py-health-check' def __init__(self, connection_pool, shard_hint=None, - ignore_subscribe_messages=False): + ignore_subscribe_messages=False, encoder=None): self.connection_pool = connection_pool self.shard_hint = shard_hint self.ignore_subscribe_messages = ignore_subscribe_messages self.connection = None # we need to know the encoding options for this connection in order # to lookup channel and pattern names for callback handlers. - self.encoder = self.connection_pool.get_encoder() + self.encoder = encoder + if self.encoder is None: + self.encoder = self.connection_pool.get_encoder() if self.encoder.decode_responses: self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE] else: @@ -1486,12 +1517,10 @@ class PubSub: exception_handler=None): for channel, handler in self.channels.items(): if handler is None: - raise PubSubError("Channel: '%s' has no handler registered" % - channel) + raise PubSubError(f"Channel: '{channel}' has no handler registered") for pattern, handler in self.patterns.items(): if handler is None: - raise PubSubError("Pattern: '%s' has no handler registered" % - pattern) + raise PubSubError(f"Pattern: '{pattern}' has no handler registered") thread = PubSubWorkerThread( self, @@ -1776,8 +1805,10 @@ class Pipeline(Redis): def annotate_exception(self, exception, number, command): cmd = ' '.join(map(safe_str, command)) - msg = 'Command # %d (%s) of pipeline caused error: %s' % ( - number, cmd, exception.args[0]) + msg = ( + f'Command # {number} ({cmd}) of pipeline ' + f'caused error: {exception.args[0]}' + ) exception.args = (msg,) + exception.args[1:] def parse_response(self, connection, command_name, **options): @@ -1863,37 +1894,3 @@ class Pipeline(Redis): def unwatch(self): "Unwatches all previously specified keys" return self.watching and self.execute_command('UNWATCH') or True - - -class Script: - "An executable Lua script object returned by ``register_script``" - - def __init__(self, registered_client, script): - self.registered_client = registered_client - self.script = script - # Precalculate and store the SHA1 hex digest of the script. - - if isinstance(script, str): - # We need the encoding from the client in order to generate an - # accurate byte representation of the script - encoder = registered_client.connection_pool.get_encoder() - script = encoder.encode(script) - self.sha = hashlib.sha1(script).hexdigest() - - def __call__(self, keys=[], args=[], client=None): - "Execute the script, passing any required ``args``" - if client is None: - client = self.registered_client - args = tuple(keys) + tuple(args) - # make sure the Redis server knows about the script - if isinstance(client, Pipeline): - # Make sure the pipeline can register the script before executing. - client.scripts.add(self) - try: - return client.evalsha(self.sha, len(keys), *args) - except NoScriptError: - # Maybe the client is pointed to a different server than the client - # that created this instance? - # Overwrite the sha just in case there was a discrepancy. - self.sha = client.script_load(self.script) - return client.evalsha(self.sha, len(keys), *args) |