summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristján Valur Jónsson <sweskman@gmail.com>2022-09-28 13:21:19 +0000
committerGitHub <noreply@github.com>2022-09-28 16:21:19 +0300
commit6b3e0b491c4a348ca2e7a332e6f3b23a5da3c461 (patch)
treeae238a90ce3d724693c3ab4331f5ea1747871051
parent7c6a8128660d713f11d34ed4b5652ccbd9548e52 (diff)
downloadredis-py-6b3e0b491c4a348ca2e7a332e6f3b23a5da3c461.tar.gz
Dev/no lock (#2308)
* Remove async lock in asyncio.Connection.read_response * Skip concurrent-commands test on non-pooled connections
-rw-r--r--redis/asyncio/cluster.py8
-rw-r--r--redis/asyncio/connection.py35
-rw-r--r--tests/test_asyncio/test_cluster.py48
-rw-r--r--tests/test_asyncio/test_connection.py4
4 files changed, 29 insertions, 66 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 78dcd05..8d34b9a 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -390,7 +390,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
# regardless of the server type. If this is a primary connection,
# READONLY would not affect executing write commands.
await connection.send_command("READONLY")
- if str_if_bytes(await connection.read_response_without_lock()) != "OK":
+ if str_if_bytes(await connection.read_response()) != "OK":
raise ConnectionError("READONLY command failed")
def get_nodes(self) -> List["ClusterNode"]:
@@ -866,11 +866,9 @@ class ClusterNode:
) -> Any:
try:
if NEVER_DECODE in kwargs:
- response = await connection.read_response_without_lock(
- disable_decoding=True
- )
+ response = await connection.read_response(disable_decoding=True)
else:
- response = await connection.read_response_without_lock()
+ response = await connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in kwargs:
return kwargs[EMPTY_RESPONSE]
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index db8c240..16f33e2 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -671,7 +671,6 @@ class Connection:
self.set_parser(parser_class)
self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = []
self._buffer_cutoff = 6000
- self._lock = asyncio.Lock()
def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
@@ -942,39 +941,6 @@ class Connection:
async def read_response(self, disable_decoding: bool = False):
"""Read the response from a previously sent command"""
try:
- async with self._lock:
- if self.socket_timeout:
- async with async_timeout.timeout(self.socket_timeout):
- response = await self._parser.read_response(
- disable_decoding=disable_decoding
- )
- else:
- response = await self._parser.read_response(
- disable_decoding=disable_decoding
- )
- except asyncio.TimeoutError:
- await self.disconnect()
- raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
- except OSError as e:
- await self.disconnect()
- raise ConnectionError(
- f"Error while reading from {self.host}:{self.port} : {e.args}"
- )
- except BaseException:
- await self.disconnect()
- raise
-
- if self.health_check_interval:
- next_time = asyncio.get_running_loop().time() + self.health_check_interval
- self.next_health_check = next_time
-
- if isinstance(response, ResponseError):
- raise response from None
- return response
-
- async def read_response_without_lock(self, disable_decoding: bool = False):
- """Read the response from a previously sent command"""
- try:
if self.socket_timeout:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response(
@@ -1241,7 +1207,6 @@ class UnixDomainSocketConnection(Connection): # lgtm [py/missing-call-to-init]
self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
- self._lock = asyncio.Lock()
def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]:
pieces = [("path", self.path), ("db", self.db)]
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py
index e299395..88cfb1f 100644
--- a/tests/test_asyncio/test_cluster.py
+++ b/tests/test_asyncio/test_cluster.py
@@ -120,7 +120,7 @@ async def get_mocked_redis_client(*args, **kwargs) -> RedisCluster:
def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
connection = mock.AsyncMock()
connection.is_connected = True
- connection.read_response_without_lock.return_value = response
+ connection.read_response.return_value = response
while node._free:
node._free.pop()
node._free.append(connection)
@@ -130,7 +130,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
connection = mock.AsyncMock()
connection.is_connected = True
- connection.read_response_without_lock.side_effect = exc
+ connection.read_response.side_effect = exc
while node._free:
node._free.pop()
node._free.append(connection)
@@ -275,16 +275,12 @@ class TestRedisClusterObj:
for node in rc.get_nodes():
assert node.max_connections == 10
- with mock.patch.object(
- Connection, "read_response_without_lock"
- ) as read_response_without_lock:
+ with mock.patch.object(Connection, "read_response") as read_response:
- async def read_response_without_lock_mocked(
- *args: Any, **kwargs: Any
- ) -> None:
+ async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
await asyncio.sleep(10)
- read_response_without_lock.side_effect = read_response_without_lock_mocked
+ read_response.side_effect = read_response_mocked
with pytest.raises(MaxConnectionsError):
await asyncio.gather(
@@ -316,10 +312,10 @@ class TestRedisClusterObj:
assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
for primary in primaries:
conn = primary._free.pop()
- assert conn.read_response_without_lock.called is True
+ assert conn.read_response.called is True
for replica in replicas:
conn = replica._free.pop()
- assert conn.read_response_without_lock.called is not True
+ assert conn.read_response.called is not True
async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
"""
@@ -333,10 +329,10 @@ class TestRedisClusterObj:
assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
for replica in replicas:
conn = replica._free.pop()
- assert conn.read_response_without_lock.called is True
+ assert conn.read_response.called is True
for primary in primaries:
conn = primary._free.pop()
- assert conn.read_response_without_lock.called is not True
+ assert conn.read_response.called is not True
await r.close()
@@ -348,7 +344,7 @@ class TestRedisClusterObj:
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
for node in r.get_nodes():
conn = node._free.pop()
- assert conn.read_response_without_lock.called is True
+ assert conn.read_response.called is True
async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
"""
@@ -359,7 +355,7 @@ class TestRedisClusterObj:
called_count = 0
for node in r.get_nodes():
conn = node._free.pop()
- if conn.read_response_without_lock.called is True:
+ if conn.read_response.called is True:
called_count += 1
assert called_count == 1
@@ -372,7 +368,7 @@ class TestRedisClusterObj:
mock_node_resp(def_node, "PONG")
assert await r.ping() is True
conn = def_node._free.pop()
- assert conn.read_response_without_lock.called
+ assert conn.read_response.called
async def test_ask_redirection(self, r: RedisCluster) -> None:
"""
@@ -516,7 +512,7 @@ class TestRedisClusterObj:
with mock.patch.multiple(
Connection,
send_command=mock.DEFAULT,
- read_response_without_lock=mock.DEFAULT,
+ read_response=mock.DEFAULT,
_connect=mock.DEFAULT,
can_read=mock.DEFAULT,
on_connect=mock.DEFAULT,
@@ -548,7 +544,7 @@ class TestRedisClusterObj:
# so we'll mock some of the Connection's functions to allow it
execute_command.side_effect = execute_command_mock_first
mocks["send_command"].return_value = True
- mocks["read_response_without_lock"].return_value = "OK"
+ mocks["read_response"].return_value = "OK"
mocks["_connect"].return_value = True
mocks["can_read"].return_value = False
mocks["on_connect"].return_value = True
@@ -857,8 +853,8 @@ class TestClusterRedisCommands:
node0 = r.get_node(default_host, 7000)
node1 = r.get_node(default_host, 7001)
assert await r.cluster_delslots(0, 8192) == [True, True]
- assert node0._free.pop().read_response_without_lock.called
- assert node1._free.pop().read_response_without_lock.called
+ assert node0._free.pop().read_response.called
+ assert node1._free.pop().read_response.called
await r.close()
@@ -1027,7 +1023,7 @@ class TestClusterRedisCommands:
node = r.nodes_manager.get_node_from_slot(12182)
mock_node_resp(node, "OK")
assert await r.cluster_setslot_stable(12182) is True
- assert node._free.pop().read_response_without_lock.called
+ assert node._free.pop().read_response.called
@skip_if_redis_enterprise()
async def test_cluster_replicas(self, r: RedisCluster) -> None:
@@ -1069,7 +1065,7 @@ class TestClusterRedisCommands:
for res in all_replicas_results.values():
assert res is True
for replica in r.get_replicas():
- assert replica._free.pop().read_response_without_lock.called
+ assert replica._free.pop().read_response.called
await r.close()
@@ -1082,7 +1078,7 @@ class TestClusterRedisCommands:
for res in all_replicas_results.values():
assert res is True
for replica in r.get_replicas():
- assert replica._free.pop().read_response_without_lock.called
+ assert replica._free.pop().read_response.called
await r.close()
@@ -2441,8 +2437,8 @@ class TestClusterPipeline:
mock_node_resp_exc(first_node, AskError(ask_msg))
mock_node_resp(ask_node, "MOCK_OK")
res = await pipe.get(key).execute()
- assert first_node._free.pop().read_response_without_lock.await_count
- assert ask_node._free.pop().read_response_without_lock.await_count
+ assert first_node._free.pop().read_response.await_count
+ assert ask_node._free.pop().read_response.await_count
assert res == ["MOCK_OK"]
async def test_moved_redirection_on_slave_with_default(
@@ -2497,7 +2493,7 @@ class TestClusterPipeline:
executed_on_replica = False
for node in slot_nodes:
if node.server_type == REPLICA:
- if node._free.pop().read_response_without_lock.await_count:
+ if node._free.pop().read_response.await_count:
executed_on_replica = True
break
assert executed_on_replica
diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py
index 419c8a7..674a1b9 100644
--- a/tests/test_asyncio/test_connection.py
+++ b/tests/test_asyncio/test_connection.py
@@ -64,6 +64,10 @@ async def test_socket_param_regression(r):
async def test_can_run_concurrent_commands(r):
+ if getattr(r, "connection", None) is not None:
+ # Concurrent commands are only supported on pooled or cluster connections
+ # since there is no synchronization on a single connection.
+ pytest.skip("pool only")
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))