summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-07-24 18:05:01 +0530
committerGitHub <noreply@github.com>2022-07-24 15:35:01 +0300
commitae171d16e173c367256b1da42f66947fd3c6d1ea (patch)
tree54af2cbdadfb397a102483ba0add5b432854d777 /redis/asyncio/cluster.py
parenta304953ed57c81a008a0217df375239a85ad7a04 (diff)
downloadredis-py-ae171d16e173c367256b1da42f66947fd3c6d1ea.tar.gz
async_cluster: fix concurrent pipeline (#2280)
- each pipeline should create separate stacks for each node
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py18
1 files changed, 9 insertions, 9 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 2894004..3fe3ebc 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -755,7 +755,6 @@ class ClusterNode:
"""
__slots__ = (
- "_command_stack",
"_connections",
"_free",
"connection_class",
@@ -796,7 +795,6 @@ class ClusterNode:
self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
- self._command_stack: List["PipelineCommand"] = []
def __repr__(self) -> str:
return (
@@ -887,18 +885,18 @@ class ClusterNode:
# Release connection
self._free.append(connection)
- async def execute_pipeline(self) -> bool:
+ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
# Execute command
await connection.send_packed_command(
- connection.pack_commands(cmd.args for cmd in self._command_stack), False
+ connection.pack_commands(cmd.args for cmd in commands), False
)
# Read responses
ret = False
- for cmd in self._command_stack:
+ for cmd in commands:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
@@ -1365,12 +1363,14 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
node = target_nodes[0]
if node.name not in nodes:
- nodes[node.name] = node
- node._command_stack = []
- node._command_stack.append(cmd)
+ nodes[node.name] = (node, [])
+ nodes[node.name][1].append(cmd)
errors = await asyncio.gather(
- *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
+ *(
+ asyncio.ensure_future(node[0].execute_pipeline(node[1]))
+ for node in nodes.values()
+ )
)
if any(errors):