summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py18
1 files changed, 9 insertions, 9 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 2894004..3fe3ebc 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -755,7 +755,6 @@ class ClusterNode:
"""
__slots__ = (
- "_command_stack",
"_connections",
"_free",
"connection_class",
@@ -796,7 +795,6 @@ class ClusterNode:
self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
- self._command_stack: List["PipelineCommand"] = []
def __repr__(self) -> str:
return (
@@ -887,18 +885,18 @@ class ClusterNode:
# Release connection
self._free.append(connection)
- async def execute_pipeline(self) -> bool:
+ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
# Execute command
await connection.send_packed_command(
- connection.pack_commands(cmd.args for cmd in self._command_stack), False
+ connection.pack_commands(cmd.args for cmd in commands), False
)
# Read responses
ret = False
- for cmd in self._command_stack:
+ for cmd in commands:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
@@ -1365,12 +1363,14 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
node = target_nodes[0]
if node.name not in nodes:
- nodes[node.name] = node
- node._command_stack = []
- node._command_stack.append(cmd)
+ nodes[node.name] = (node, [])
+ nodes[node.name][1].append(cmd)
errors = await asyncio.gather(
- *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
+ *(
+ asyncio.ensure_future(node[0].execute_pipeline(node[1]))
+ for node in nodes.values()
+ )
)
if any(errors):