diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-07-24 18:05:01 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-24 15:35:01 +0300 |
commit | ae171d16e173c367256b1da42f66947fd3c6d1ea (patch) | |
tree | 54af2cbdadfb397a102483ba0add5b432854d777 /redis/asyncio/cluster.py | |
parent | a304953ed57c81a008a0217df375239a85ad7a04 (diff) | |
download | redis-py-ae171d16e173c367256b1da42f66947fd3c6d1ea.tar.gz |
async_cluster: fix concurrent pipeline (#2280)
- each pipeline should create separate stacks for each node
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r-- | redis/asyncio/cluster.py | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 2894004..3fe3ebc 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -755,7 +755,6 @@ class ClusterNode: """ __slots__ = ( - "_command_stack", "_connections", "_free", "connection_class", @@ -796,7 +795,6 @@ class ClusterNode: self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) - self._command_stack: List["PipelineCommand"] = [] def __repr__(self) -> str: return ( @@ -887,18 +885,18 @@ class ClusterNode: # Release connection self._free.append(connection) - async def execute_pipeline(self) -> bool: + async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection connection = self.acquire_connection() # Execute command await connection.send_packed_command( - connection.pack_commands(cmd.args for cmd in self._command_stack), False + connection.pack_commands(cmd.args for cmd in commands), False ) # Read responses ret = False - for cmd in self._command_stack: + for cmd in commands: try: cmd.result = await self.parse_response( connection, cmd.args[0], **cmd.kwargs @@ -1365,12 +1363,14 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm node = target_nodes[0] if node.name not in nodes: - nodes[node.name] = node - node._command_stack = [] - node._command_stack.append(cmd) + nodes[node.name] = (node, []) + nodes[node.name][1].append(cmd) errors = await asyncio.gather( - *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values()) + *( + asyncio.ensure_future(node[0].execute_pipeline(node[1])) + for node in nodes.values() + ) ) if any(errors): |