diff options
-rw-r--r-- | redis/asyncio/cluster.py | 8 | ||||
-rw-r--r-- | redis/asyncio/connection.py | 35 | ||||
-rw-r--r-- | tests/test_asyncio/test_cluster.py | 48 | ||||
-rw-r--r-- | tests/test_asyncio/test_connection.py | 4 |
4 files changed, 29 insertions, 66 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 78dcd05..8d34b9a 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -390,7 +390,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand # regardless of the server type. If this is a primary connection, # READONLY would not affect executing write commands. await connection.send_command("READONLY") - if str_if_bytes(await connection.read_response_without_lock()) != "OK": + if str_if_bytes(await connection.read_response()) != "OK": raise ConnectionError("READONLY command failed") def get_nodes(self) -> List["ClusterNode"]: @@ -866,11 +866,9 @@ class ClusterNode: ) -> Any: try: if NEVER_DECODE in kwargs: - response = await connection.read_response_without_lock( - disable_decoding=True - ) + response = await connection.read_response(disable_decoding=True) else: - response = await connection.read_response_without_lock() + response = await connection.read_response() except ResponseError: if EMPTY_RESPONSE in kwargs: return kwargs[EMPTY_RESPONSE] diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index db8c240..16f33e2 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -671,7 +671,6 @@ class Connection: self.set_parser(parser_class) self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = [] self._buffer_cutoff = 6000 - self._lock = asyncio.Lock() def __repr__(self): repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) @@ -942,39 +941,6 @@ class Connection: async def read_response(self, disable_decoding: bool = False): """Read the response from a previously sent command""" try: - async with self._lock: - if self.socket_timeout: - async with async_timeout.timeout(self.socket_timeout): - response = await self._parser.read_response( - disable_decoding=disable_decoding - ) - else: - response = await self._parser.read_response( - disable_decoding=disable_decoding - ) - except asyncio.TimeoutError: - await self.disconnect() - raise TimeoutError(f"Timeout reading from {self.host}:{self.port}") - except OSError as e: - await self.disconnect() - raise ConnectionError( - f"Error while reading from {self.host}:{self.port} : {e.args}" - ) - except BaseException: - await self.disconnect() - raise - - if self.health_check_interval: - next_time = asyncio.get_running_loop().time() + self.health_check_interval - self.next_health_check = next_time - - if isinstance(response, ResponseError): - raise response from None - return response - - async def read_response_without_lock(self, disable_decoding: bool = False): - """Read the response from a previously sent command""" - try: if self.socket_timeout: async with async_timeout.timeout(self.socket_timeout): response = await self._parser.read_response( @@ -1241,7 +1207,6 @@ class UnixDomainSocketConnection(Connection): # lgtm [py/missing-call-to-init] self.set_parser(parser_class) self._connect_callbacks = [] self._buffer_cutoff = 6000 - self._lock = asyncio.Lock() def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]: pieces = [("path", self.path), ("db", self.db)] diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index e299395..88cfb1f 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -120,7 +120,7 @@ async def get_mocked_redis_client(*args, **kwargs) -> RedisCluster: def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode: connection = mock.AsyncMock() connection.is_connected = True - connection.read_response_without_lock.return_value = response + connection.read_response.return_value = response while node._free: node._free.pop() node._free.append(connection) @@ -130,7 +130,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode: def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode: connection = mock.AsyncMock() connection.is_connected = True - connection.read_response_without_lock.side_effect = exc + connection.read_response.side_effect = exc while node._free: node._free.pop() node._free.append(connection) @@ -275,16 +275,12 @@ class TestRedisClusterObj: for node in rc.get_nodes(): assert node.max_connections == 10 - with mock.patch.object( - Connection, "read_response_without_lock" - ) as read_response_without_lock: + with mock.patch.object(Connection, "read_response") as read_response: - async def read_response_without_lock_mocked( - *args: Any, **kwargs: Any - ) -> None: + async def read_response_mocked(*args: Any, **kwargs: Any) -> None: await asyncio.sleep(10) - read_response_without_lock.side_effect = read_response_without_lock_mocked + read_response.side_effect = read_response_mocked with pytest.raises(MaxConnectionsError): await asyncio.gather( @@ -316,10 +312,10 @@ class TestRedisClusterObj: assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True for primary in primaries: conn = primary._free.pop() - assert conn.read_response_without_lock.called is True + assert conn.read_response.called is True for replica in replicas: conn = replica._free.pop() - assert conn.read_response_without_lock.called is not True + assert conn.read_response.called is not True async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None: """ @@ -333,10 +329,10 @@ class TestRedisClusterObj: assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True for replica in replicas: conn = replica._free.pop() - assert conn.read_response_without_lock.called is True + assert conn.read_response.called is True for primary in primaries: conn = primary._free.pop() - assert conn.read_response_without_lock.called is not True + assert conn.read_response.called is not True await r.close() @@ -348,7 +344,7 @@ class TestRedisClusterObj: assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True for node in r.get_nodes(): conn = node._free.pop() - assert conn.read_response_without_lock.called is True + assert conn.read_response.called is True async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None: """ @@ -359,7 +355,7 @@ class TestRedisClusterObj: called_count = 0 for node in r.get_nodes(): conn = node._free.pop() - if conn.read_response_without_lock.called is True: + if conn.read_response.called is True: called_count += 1 assert called_count == 1 @@ -372,7 +368,7 @@ class TestRedisClusterObj: mock_node_resp(def_node, "PONG") assert await r.ping() is True conn = def_node._free.pop() - assert conn.read_response_without_lock.called + assert conn.read_response.called async def test_ask_redirection(self, r: RedisCluster) -> None: """ @@ -516,7 +512,7 @@ class TestRedisClusterObj: with mock.patch.multiple( Connection, send_command=mock.DEFAULT, - read_response_without_lock=mock.DEFAULT, + read_response=mock.DEFAULT, _connect=mock.DEFAULT, can_read=mock.DEFAULT, on_connect=mock.DEFAULT, @@ -548,7 +544,7 @@ class TestRedisClusterObj: # so we'll mock some of the Connection's functions to allow it execute_command.side_effect = execute_command_mock_first mocks["send_command"].return_value = True - mocks["read_response_without_lock"].return_value = "OK" + mocks["read_response"].return_value = "OK" mocks["_connect"].return_value = True mocks["can_read"].return_value = False mocks["on_connect"].return_value = True @@ -857,8 +853,8 @@ class TestClusterRedisCommands: node0 = r.get_node(default_host, 7000) node1 = r.get_node(default_host, 7001) assert await r.cluster_delslots(0, 8192) == [True, True] - assert node0._free.pop().read_response_without_lock.called - assert node1._free.pop().read_response_without_lock.called + assert node0._free.pop().read_response.called + assert node1._free.pop().read_response.called await r.close() @@ -1027,7 +1023,7 @@ class TestClusterRedisCommands: node = r.nodes_manager.get_node_from_slot(12182) mock_node_resp(node, "OK") assert await r.cluster_setslot_stable(12182) is True - assert node._free.pop().read_response_without_lock.called + assert node._free.pop().read_response.called @skip_if_redis_enterprise() async def test_cluster_replicas(self, r: RedisCluster) -> None: @@ -1069,7 +1065,7 @@ class TestClusterRedisCommands: for res in all_replicas_results.values(): assert res is True for replica in r.get_replicas(): - assert replica._free.pop().read_response_without_lock.called + assert replica._free.pop().read_response.called await r.close() @@ -1082,7 +1078,7 @@ class TestClusterRedisCommands: for res in all_replicas_results.values(): assert res is True for replica in r.get_replicas(): - assert replica._free.pop().read_response_without_lock.called + assert replica._free.pop().read_response.called await r.close() @@ -2441,8 +2437,8 @@ class TestClusterPipeline: mock_node_resp_exc(first_node, AskError(ask_msg)) mock_node_resp(ask_node, "MOCK_OK") res = await pipe.get(key).execute() - assert first_node._free.pop().read_response_without_lock.await_count - assert ask_node._free.pop().read_response_without_lock.await_count + assert first_node._free.pop().read_response.await_count + assert ask_node._free.pop().read_response.await_count assert res == ["MOCK_OK"] async def test_moved_redirection_on_slave_with_default( @@ -2497,7 +2493,7 @@ class TestClusterPipeline: executed_on_replica = False for node in slot_nodes: if node.server_type == REPLICA: - if node._free.pop().read_response_without_lock.await_count: + if node._free.pop().read_response.await_count: executed_on_replica = True break assert executed_on_replica diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 419c8a7..674a1b9 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -64,6 +64,10 @@ async def test_socket_param_regression(r): async def test_can_run_concurrent_commands(r): + if getattr(r, "connection", None) is not None: + # Concurrent commands are only supported on pooled or cluster connections + # since there is no synchronization on a single connection. + pytest.skip("pool only") assert await r.ping() is True assert all(await asyncio.gather(*(r.ping() for _ in range(10)))) |