summaryrefslogtreecommitdiff
path: root/redis/asyncio/parser.py
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-05-08 17:34:20 +0530
committerGitHub <noreply@github.com>2022-05-08 15:04:20 +0300
commit061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch)
treee64d64c5917312a65304f4d630775d08adb38bfa /redis/asyncio/parser.py
parentc25be04d6468163d31908774ed358d3fd6bc0a39 (diff)
downloadredis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio * Async Cluster Tests: Async/Await * Add Async RedisCluster * cluster: use ERRORS_ALLOW_RETRY from self.__class__ * async_cluster: rework redis_connection, initialize, & close - move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class - use Locks while initializing or closing - in case of error, close connections instead of instantly reinitializing - create ResourceWarning instead of manually deleting client object - use asyncio.gather to run commands/initialize/close in parallel - inline single use functions - fix test_acl_log for py3.6 * async_cluster: add types * async_cluster: add docs * docs: update sphinx & add sphinx_autodoc_typehints * async_cluster: move TargetNodesT to cluster module * async_cluster/commands: inherit commands from sync class if possible * async_cluster: add benchmark script with aredis & aioredis-cluster * async_cluster: remove logging * async_cluster: inline functions * async_cluster: manage Connection instead of Redis Client * async_cluster/commands: optimize parser * async_cluster: use ensure_future & generators for gather * async_conn: optimize * async_cluster: optimize determine_slot * async_cluster: optimize determine_nodes * async_cluster/parser: optimize _get_moveable_keys * async_cluster: inlined check_slots_coverage * async_cluster: update docstrings * async_cluster: add concurrent test & use read_response/_update_moved_slots without lock Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/asyncio/parser.py')
-rw-r--r--redis/asyncio/parser.py95
1 files changed, 95 insertions, 0 deletions
diff --git a/redis/asyncio/parser.py b/redis/asyncio/parser.py
new file mode 100644
index 0000000..273fe03
--- /dev/null
+++ b/redis/asyncio/parser.py
@@ -0,0 +1,95 @@
+from typing import TYPE_CHECKING, List, Optional, Union
+
+from redis.exceptions import RedisError, ResponseError
+
+if TYPE_CHECKING:
+ from redis.asyncio.cluster import ClusterNode
+
+
+class CommandsParser:
+ """
+ Parses Redis commands to get command keys.
+
+ COMMAND output is used to determine key locations.
+ Commands that do not have a predefined key location are flagged with 'movablekeys',
+ and these commands' keys are determined by the command 'COMMAND GETKEYS'.
+
+ NOTE: Due to a bug in redis<7.0, this does not work properly
+ for EVAL or EVALSHA when the `numkeys` arg is 0.
+ - issue: https://github.com/redis/redis/issues/9493
+ - fix: https://github.com/redis/redis/pull/9733
+
+ So, don't use this with EVAL or EVALSHA.
+ """
+
+ __slots__ = ("commands",)
+
+ def __init__(self) -> None:
+ self.commands = {}
+
+ async def initialize(self, r: "ClusterNode") -> None:
+ commands = await r.execute_command("COMMAND")
+ for cmd, command in commands.items():
+ if "movablekeys" in command["flags"]:
+ commands[cmd] = -1
+ elif command["first_key_pos"] == 0 and command["last_key_pos"] == 0:
+ commands[cmd] = 0
+ elif command["first_key_pos"] == 1 and command["last_key_pos"] == 1:
+ commands[cmd] = 1
+ self.commands = {cmd.upper(): command for cmd, command in commands.items()}
+
+ # As soon as this PR is merged into Redis, we should reimplement
+ # our logic to use COMMAND INFO changes to determine the key positions
+ # https://github.com/redis/redis/pull/8324
+ async def get_keys(
+ self, redis_conn: "ClusterNode", *args
+ ) -> Optional[Union[List[str], List[bytes]]]:
+ if len(args) < 2:
+ # The command has no keys in it
+ return None
+
+ try:
+ command = self.commands[args[0]]
+ except KeyError:
+ # try to split the command name and to take only the main command
+ # e.g. 'memory' for 'memory usage'
+ args = args[0].split() + list(args[1:])
+ cmd_name = args[0]
+ if cmd_name not in self.commands:
+ # We'll try to reinitialize the commands cache, if the engine
+ # version has changed, the commands may not be current
+ await self.initialize(redis_conn)
+ if cmd_name not in self.commands:
+ raise RedisError(
+ f"{cmd_name.upper()} command doesn't exist in Redis commands"
+ )
+
+ command = self.commands[cmd_name]
+
+ if command == 1:
+ return [args[1]]
+ if command == 0:
+ return None
+ if command == -1:
+ return await self._get_moveable_keys(redis_conn, *args)
+
+ last_key_pos = command["last_key_pos"]
+ if last_key_pos < 0:
+ last_key_pos = len(args) + last_key_pos
+ return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]]
+
+ async def _get_moveable_keys(
+ self, redis_conn: "ClusterNode", *args
+ ) -> Optional[List[str]]:
+ try:
+ keys = await redis_conn.execute_command("COMMAND GETKEYS", *args)
+ except ResponseError as e:
+ message = e.__str__()
+ if (
+ "Invalid arguments" in message
+ or "The command has no key arguments" in message
+ ):
+ return None
+ else:
+ raise e
+ return keys