From bac33d4a92892ca7982b461347151bff5a661f0d Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Mon, 30 May 2022 21:45:45 +0530 Subject: async_cluster: add pipeline support (#2199) Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> --- redis/cluster.py | 114 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 36 deletions(-) (limited to 'redis/cluster.py') diff --git a/redis/cluster.py b/redis/cluster.py index 46a96a6..2e31063 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -6,7 +6,7 @@ import sys import threading import time from collections import OrderedDict -from typing import Any, Dict, Tuple +from typing import Any, Callable, Dict, Tuple from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import CommandsParser, RedisClusterCommands @@ -2130,7 +2130,7 @@ class ClusterPipeline(RedisCluster): return self.execute_command("DEL", names[0]) -def block_pipeline_command(func): +def block_pipeline_command(name: str) -> Callable[..., Any]: """ Prints error because some pipelined commands should be blocked when running in cluster-mode @@ -2138,7 +2138,7 @@ def block_pipeline_command(func): def inner(*args, **kwargs): raise RedisClusterException( - f"ERROR: Calling pipelined function {func.__name__} is blocked " + f"ERROR: Calling pipelined function {name} is blocked " f"when running redis in cluster mode..." ) @@ -2146,39 +2146,81 @@ def block_pipeline_command(func): # Blocked pipeline commands -ClusterPipeline.bitop = block_pipeline_command(RedisCluster.bitop) -ClusterPipeline.brpoplpush = block_pipeline_command(RedisCluster.brpoplpush) -ClusterPipeline.client_getname = block_pipeline_command(RedisCluster.client_getname) -ClusterPipeline.client_list = block_pipeline_command(RedisCluster.client_list) -ClusterPipeline.client_setname = block_pipeline_command(RedisCluster.client_setname) -ClusterPipeline.config_set = block_pipeline_command(RedisCluster.config_set) -ClusterPipeline.dbsize = block_pipeline_command(RedisCluster.dbsize) -ClusterPipeline.flushall = block_pipeline_command(RedisCluster.flushall) -ClusterPipeline.flushdb = block_pipeline_command(RedisCluster.flushdb) -ClusterPipeline.keys = block_pipeline_command(RedisCluster.keys) -ClusterPipeline.mget = block_pipeline_command(RedisCluster.mget) -ClusterPipeline.move = block_pipeline_command(RedisCluster.move) -ClusterPipeline.mset = block_pipeline_command(RedisCluster.mset) -ClusterPipeline.msetnx = block_pipeline_command(RedisCluster.msetnx) -ClusterPipeline.pfmerge = block_pipeline_command(RedisCluster.pfmerge) -ClusterPipeline.pfcount = block_pipeline_command(RedisCluster.pfcount) -ClusterPipeline.ping = block_pipeline_command(RedisCluster.ping) -ClusterPipeline.publish = block_pipeline_command(RedisCluster.publish) -ClusterPipeline.randomkey = block_pipeline_command(RedisCluster.randomkey) -ClusterPipeline.rename = block_pipeline_command(RedisCluster.rename) -ClusterPipeline.renamenx = block_pipeline_command(RedisCluster.renamenx) -ClusterPipeline.rpoplpush = block_pipeline_command(RedisCluster.rpoplpush) -ClusterPipeline.scan = block_pipeline_command(RedisCluster.scan) -ClusterPipeline.sdiff = block_pipeline_command(RedisCluster.sdiff) -ClusterPipeline.sdiffstore = block_pipeline_command(RedisCluster.sdiffstore) -ClusterPipeline.sinter = block_pipeline_command(RedisCluster.sinter) -ClusterPipeline.sinterstore = block_pipeline_command(RedisCluster.sinterstore) -ClusterPipeline.smove = block_pipeline_command(RedisCluster.smove) -ClusterPipeline.sort = block_pipeline_command(RedisCluster.sort) -ClusterPipeline.sunion = block_pipeline_command(RedisCluster.sunion) -ClusterPipeline.sunionstore = block_pipeline_command(RedisCluster.sunionstore) -ClusterPipeline.readwrite = block_pipeline_command(RedisCluster.readwrite) -ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly) +PIPELINE_BLOCKED_COMMANDS = ( + "BGREWRITEAOF", + "BGSAVE", + "BITOP", + "BRPOPLPUSH", + "CLIENT GETNAME", + "CLIENT KILL", + "CLIENT LIST", + "CLIENT SETNAME", + "CLIENT", + "CONFIG GET", + "CONFIG RESETSTAT", + "CONFIG REWRITE", + "CONFIG SET", + "CONFIG", + "DBSIZE", + "ECHO", + "EVALSHA", + "FLUSHALL", + "FLUSHDB", + "INFO", + "KEYS", + "LASTSAVE", + "MGET", + "MGET NONATOMIC", + "MOVE", + "MSET", + "MSET NONATOMIC", + "MSETNX", + "PFCOUNT", + "PFMERGE", + "PING", + "PUBLISH", + "RANDOMKEY", + "READONLY", + "READWRITE", + "RENAME", + "RENAMENX", + "RPOPLPUSH", + "SAVE", + "SCAN", + "SCRIPT EXISTS", + "SCRIPT FLUSH", + "SCRIPT KILL", + "SCRIPT LOAD", + "SCRIPT", + "SDIFF", + "SDIFFSTORE", + "SENTINEL GET MASTER ADDR BY NAME", + "SENTINEL MASTER", + "SENTINEL MASTERS", + "SENTINEL MONITOR", + "SENTINEL REMOVE", + "SENTINEL SENTINELS", + "SENTINEL SET", + "SENTINEL SLAVES", + "SENTINEL", + "SHUTDOWN", + "SINTER", + "SINTERSTORE", + "SLAVEOF", + "SLOWLOG GET", + "SLOWLOG LEN", + "SLOWLOG RESET", + "SLOWLOG", + "SMOVE", + "SORT", + "SUNION", + "SUNIONSTORE", + "TIME", +) +for command in PIPELINE_BLOCKED_COMMANDS: + command = command.replace(" ", "_").lower() + + setattr(ClusterPipeline, command, block_pipeline_command(command)) class PipelineCommand: -- cgit v1.2.1