diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-05-08 17:34:20 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-08 15:04:20 +0300 |
commit | 061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch) | |
tree | e64d64c5917312a65304f4d630775d08adb38bfa /redis/asyncio/parser.py | |
parent | c25be04d6468163d31908774ed358d3fd6bc0a39 (diff) | |
download | redis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz |
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio
* Async Cluster Tests: Async/Await
* Add Async RedisCluster
* cluster: use ERRORS_ALLOW_RETRY from self.__class__
* async_cluster: rework redis_connection, initialize, & close
- move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class
- use Locks while initializing or closing
- in case of error, close connections instead of instantly reinitializing
- create ResourceWarning instead of manually deleting client object
- use asyncio.gather to run commands/initialize/close in parallel
- inline single use functions
- fix test_acl_log for py3.6
* async_cluster: add types
* async_cluster: add docs
* docs: update sphinx & add sphinx_autodoc_typehints
* async_cluster: move TargetNodesT to cluster module
* async_cluster/commands: inherit commands from sync class if possible
* async_cluster: add benchmark script with aredis & aioredis-cluster
* async_cluster: remove logging
* async_cluster: inline functions
* async_cluster: manage Connection instead of Redis Client
* async_cluster/commands: optimize parser
* async_cluster: use ensure_future & generators for gather
* async_conn: optimize
* async_cluster: optimize determine_slot
* async_cluster: optimize determine_nodes
* async_cluster/parser: optimize _get_moveable_keys
* async_cluster: inlined check_slots_coverage
* async_cluster: update docstrings
* async_cluster: add concurrent test & use read_response/_update_moved_slots without lock
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/asyncio/parser.py')
-rw-r--r-- | redis/asyncio/parser.py | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/redis/asyncio/parser.py b/redis/asyncio/parser.py new file mode 100644 index 0000000..273fe03 --- /dev/null +++ b/redis/asyncio/parser.py @@ -0,0 +1,95 @@ +from typing import TYPE_CHECKING, List, Optional, Union + +from redis.exceptions import RedisError, ResponseError + +if TYPE_CHECKING: + from redis.asyncio.cluster import ClusterNode + + +class CommandsParser: + """ + Parses Redis commands to get command keys. + + COMMAND output is used to determine key locations. + Commands that do not have a predefined key location are flagged with 'movablekeys', + and these commands' keys are determined by the command 'COMMAND GETKEYS'. + + NOTE: Due to a bug in redis<7.0, this does not work properly + for EVAL or EVALSHA when the `numkeys` arg is 0. + - issue: https://github.com/redis/redis/issues/9493 + - fix: https://github.com/redis/redis/pull/9733 + + So, don't use this with EVAL or EVALSHA. + """ + + __slots__ = ("commands",) + + def __init__(self) -> None: + self.commands = {} + + async def initialize(self, r: "ClusterNode") -> None: + commands = await r.execute_command("COMMAND") + for cmd, command in commands.items(): + if "movablekeys" in command["flags"]: + commands[cmd] = -1 + elif command["first_key_pos"] == 0 and command["last_key_pos"] == 0: + commands[cmd] = 0 + elif command["first_key_pos"] == 1 and command["last_key_pos"] == 1: + commands[cmd] = 1 + self.commands = {cmd.upper(): command for cmd, command in commands.items()} + + # As soon as this PR is merged into Redis, we should reimplement + # our logic to use COMMAND INFO changes to determine the key positions + # https://github.com/redis/redis/pull/8324 + async def get_keys( + self, redis_conn: "ClusterNode", *args + ) -> Optional[Union[List[str], List[bytes]]]: + if len(args) < 2: + # The command has no keys in it + return None + + try: + command = self.commands[args[0]] + except KeyError: + # try to split the command name and to take only the main command + # e.g. 'memory' for 'memory usage' + args = args[0].split() + list(args[1:]) + cmd_name = args[0] + if cmd_name not in self.commands: + # We'll try to reinitialize the commands cache, if the engine + # version has changed, the commands may not be current + await self.initialize(redis_conn) + if cmd_name not in self.commands: + raise RedisError( + f"{cmd_name.upper()} command doesn't exist in Redis commands" + ) + + command = self.commands[cmd_name] + + if command == 1: + return [args[1]] + if command == 0: + return None + if command == -1: + return await self._get_moveable_keys(redis_conn, *args) + + last_key_pos = command["last_key_pos"] + if last_key_pos < 0: + last_key_pos = len(args) + last_key_pos + return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]] + + async def _get_moveable_keys( + self, redis_conn: "ClusterNode", *args + ) -> Optional[List[str]]: + try: + keys = await redis_conn.execute_command("COMMAND GETKEYS", *args) + except ResponseError as e: + message = e.__str__() + if ( + "Invalid arguments" in message + or "The command has no key arguments" in message + ): + return None + else: + raise e + return keys |