summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py101
1 files changed, 49 insertions, 52 deletions
diff --git a/redis/client.py b/redis/client.py
index dc6693d..9f2907e 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,7 +1,6 @@
from itertools import chain
import copy
import datetime
-import hashlib
import re
import threading
import time
@@ -15,7 +14,6 @@ from redis.exceptions import (
ConnectionError,
ExecAbortError,
ModuleError,
- NoScriptError,
PubSubError,
RedisError,
ResponseError,
@@ -27,6 +25,9 @@ from redis.utils import safe_str, str_if_bytes
SYM_EMPTY = b''
EMPTY_RESPONSE = 'EMPTY_RESPONSE'
+# some responses (ie. dump) are binary, and just meant to never be decoded
+NEVER_DECODE = 'NEVER_DECODE'
+
def timestamp_to_datetime(response):
"Converts a unix timestamp to a Python datetime object"
@@ -461,6 +462,7 @@ def _parse_node_line(line):
line_items = line.split(' ')
node_id, addr, flags, master_id, ping, pong, epoch, \
connected = line.split(' ')[:8]
+ addr = addr.split('@')[0]
slots = [sl.split('-') for sl in line_items[8:]]
node_dict = {
'node_id': node_id,
@@ -476,8 +478,13 @@ def _parse_node_line(line):
def parse_cluster_nodes(response, **options):
- raw_lines = str_if_bytes(response).splitlines()
- return dict(_parse_node_line(line) for line in raw_lines)
+ """
+ @see: https://redis.io/commands/cluster-nodes # string
+ @see: https://redis.io/commands/cluster-replicas # list of string
+ """
+ if isinstance(response, str):
+ response = response.splitlines()
+ return dict(_parse_node_line(str_if_bytes(node)) for node in response)
def parse_geosearch_generic(response, **options):
@@ -516,6 +523,21 @@ def parse_geosearch_generic(response, **options):
]
+def parse_command(response, **options):
+ commands = {}
+ for command in response:
+ cmd_dict = {}
+ cmd_name = str_if_bytes(command[0])
+ cmd_dict['name'] = cmd_name
+ cmd_dict['arity'] = int(command[1])
+ cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]]
+ cmd_dict['first_key_pos'] = command[3]
+ cmd_dict['last_key_pos'] = command[4]
+ cmd_dict['step_count'] = command[5]
+ commands[cmd_name] = cmd_dict
+ return commands
+
+
def parse_pubsub_numsub(response, **options):
return list(zip(response[0::2], response[1::2]))
@@ -607,7 +629,7 @@ def parse_set_result(response, **options):
return response and str_if_bytes(response) == 'OK'
-class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
+class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
"""
Implementation of the Redis protocol.
@@ -704,7 +726,10 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes,
+ 'CLUSTER REPLICAS': parse_cluster_nodes,
+ 'COMMAND': parse_command,
'COMMAND COUNT': int,
+ 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)),
'CONFIG GET': parse_config_get,
'CONFIG RESETSTAT': bool_ok,
'CONFIG SET': bool_ok,
@@ -827,7 +852,7 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
health_check_interval=0, client_name=None, username=None,
- retry=None):
+ retry=None, redis_connect_func=None):
"""
Initialize a new Redis client.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -855,7 +880,8 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
'retry': copy.deepcopy(retry),
'max_connections': max_connections,
'health_check_interval': health_check_interval,
- 'client_name': client_name
+ 'client_name': client_name,
+ 'redis_connect_func': redis_connect_func
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
@@ -892,7 +918,7 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
self.__class__.RESPONSE_CALLBACKS)
def __repr__(self):
- return "%s<%s>" % (type(self).__name__, repr(self.connection_pool))
+ return f"{type(self).__name__}<{repr(self.connection_pool)}>"
def set_response_callback(self, command, callback):
"Set a custom Response Callback"
@@ -1081,7 +1107,10 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
- response = connection.read_response()
+ if NEVER_DECODE in options:
+ response = connection.read_response(disable_decoding=True)
+ else:
+ response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
@@ -1112,7 +1141,7 @@ class Monitor:
# check that monitor returns 'OK', but don't return it to user
response = self.connection.read_response()
if not bool_ok(response):
- raise RedisError('MONITOR failed: %s' % response)
+ raise RedisError(f'MONITOR failed: {response}')
return self
def __exit__(self, *args):
@@ -1173,14 +1202,16 @@ class PubSub:
HEALTH_CHECK_MESSAGE = 'redis-py-health-check'
def __init__(self, connection_pool, shard_hint=None,
- ignore_subscribe_messages=False):
+ ignore_subscribe_messages=False, encoder=None):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.ignore_subscribe_messages = ignore_subscribe_messages
self.connection = None
# we need to know the encoding options for this connection in order
# to lookup channel and pattern names for callback handlers.
- self.encoder = self.connection_pool.get_encoder()
+ self.encoder = encoder
+ if self.encoder is None:
+ self.encoder = self.connection_pool.get_encoder()
if self.encoder.decode_responses:
self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE]
else:
@@ -1486,12 +1517,10 @@ class PubSub:
exception_handler=None):
for channel, handler in self.channels.items():
if handler is None:
- raise PubSubError("Channel: '%s' has no handler registered" %
- channel)
+ raise PubSubError(f"Channel: '{channel}' has no handler registered")
for pattern, handler in self.patterns.items():
if handler is None:
- raise PubSubError("Pattern: '%s' has no handler registered" %
- pattern)
+ raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
thread = PubSubWorkerThread(
self,
@@ -1776,8 +1805,10 @@ class Pipeline(Redis):
def annotate_exception(self, exception, number, command):
cmd = ' '.join(map(safe_str, command))
- msg = 'Command # %d (%s) of pipeline caused error: %s' % (
- number, cmd, exception.args[0])
+ msg = (
+ f'Command # {number} ({cmd}) of pipeline '
+ f'caused error: {exception.args[0]}'
+ )
exception.args = (msg,) + exception.args[1:]
def parse_response(self, connection, command_name, **options):
@@ -1863,37 +1894,3 @@ class Pipeline(Redis):
def unwatch(self):
"Unwatches all previously specified keys"
return self.watching and self.execute_command('UNWATCH') or True
-
-
-class Script:
- "An executable Lua script object returned by ``register_script``"
-
- def __init__(self, registered_client, script):
- self.registered_client = registered_client
- self.script = script
- # Precalculate and store the SHA1 hex digest of the script.
-
- if isinstance(script, str):
- # We need the encoding from the client in order to generate an
- # accurate byte representation of the script
- encoder = registered_client.connection_pool.get_encoder()
- script = encoder.encode(script)
- self.sha = hashlib.sha1(script).hexdigest()
-
- def __call__(self, keys=[], args=[], client=None):
- "Execute the script, passing any required ``args``"
- if client is None:
- client = self.registered_client
- args = tuple(keys) + tuple(args)
- # make sure the Redis server knows about the script
- if isinstance(client, Pipeline):
- # Make sure the pipeline can register the script before executing.
- client.scripts.add(self)
- try:
- return client.evalsha(self.sha, len(keys), *args)
- except NoScriptError:
- # Maybe the client is pointed to a different server than the client
- # that created this instance?
- # Overwrite the sha just in case there was a discrepancy.
- self.sha = client.script_load(self.script)
- return client.evalsha(self.sha, len(keys), *args)