summaryrefslogtreecommitdiff
path: root/redis/asyncio/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/connection.py')
-rw-r--r--redis/asyncio/connection.py51
1 files changed, 20 insertions, 31 deletions
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index 64848f4..53b41af 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -208,7 +208,7 @@ class BaseParser:
def on_connect(self, connection: "Connection"):
raise NotImplementedError()
- async def can_read(self, timeout: float) -> bool:
+ async def can_read_destructive(self) -> bool:
raise NotImplementedError()
async def read_response(
@@ -286,9 +286,9 @@ class SocketBuffer:
return False
raise ConnectionError(f"Error while reading from socket: {ex.args}")
- async def can_read(self, timeout: float) -> bool:
+ async def can_read_destructive(self) -> bool:
return bool(self.length) or await self._read_from_socket(
- timeout=timeout, raise_on_timeout=False
+ timeout=0, raise_on_timeout=False
)
async def read(self, length: int) -> bytes:
@@ -386,8 +386,8 @@ class PythonParser(BaseParser):
self._buffer = None
self.encoder = None
- async def can_read(self, timeout: float):
- return self._buffer and bool(await self._buffer.can_read(timeout))
+ async def can_read_destructive(self):
+ return self._buffer and bool(await self._buffer.can_read_destructive())
async def read_response(
self, disable_decoding: bool = False
@@ -444,9 +444,7 @@ class PythonParser(BaseParser):
class HiredisParser(BaseParser):
"""Parser class for connections using Hiredis"""
- __slots__ = BaseParser.__slots__ + ("_next_response", "_reader", "_socket_timeout")
-
- _next_response: bool
+ __slots__ = BaseParser.__slots__ + ("_reader", "_socket_timeout")
def __init__(self, socket_read_size: int):
if not HIREDIS_AVAILABLE:
@@ -466,23 +464,18 @@ class HiredisParser(BaseParser):
kwargs["errors"] = connection.encoder.encoding_errors
self._reader = hiredis.Reader(**kwargs)
- self._next_response = False
self._socket_timeout = connection.socket_timeout
def on_disconnect(self):
self._stream = None
self._reader = None
- self._next_response = False
- async def can_read(self, timeout: float):
+ async def can_read_destructive(self):
if not self._stream or not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
-
- if self._next_response is False:
- self._next_response = self._reader.gets()
- if self._next_response is False:
- return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
- return True
+ if self._reader.gets():
+ return True
+ return await self.read_from_socket(timeout=0, raise_on_timeout=False)
async def read_from_socket(
self,
@@ -523,12 +516,6 @@ class HiredisParser(BaseParser):
self.on_disconnect()
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
- # _next_response might be cached from a can_read() call
- if self._next_response is not False:
- response = self._next_response
- self._next_response = False
- return response
-
response = self._reader.gets()
while response is False:
await self.read_from_socket()
@@ -925,12 +912,10 @@ class Connection:
self.pack_command(*args), check_health=kwargs.get("check_health", True)
)
- async def can_read(self, timeout: float = 0):
+ async def can_read_destructive(self):
"""Poll the socket to see if there's data that can be read."""
- if not self.is_connected:
- await self.connect()
try:
- return await self._parser.can_read(timeout)
+ return await self._parser.can_read_destructive()
except OSError as e:
await self.disconnect(nowait=True)
raise ConnectionError(
@@ -957,6 +942,10 @@ class Connection:
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
+ except asyncio.CancelledError:
+ # need this check for 3.7, where CancelledError
+ # is subclass of Exception, not BaseException
+ raise
except Exception:
await self.disconnect(nowait=True)
raise
@@ -1498,12 +1487,12 @@ class ConnectionPool:
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
try:
- if await connection.can_read():
+ if await connection.can_read_destructive():
raise ConnectionError("Connection has data") from None
except ConnectionError:
await connection.disconnect()
await connection.connect()
- if await connection.can_read():
+ if await connection.can_read_destructive():
raise ConnectionError("Connection not ready") from None
except BaseException:
# release the connection back to the pool so that we don't
@@ -1699,12 +1688,12 @@ class BlockingConnectionPool(ConnectionPool):
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
try:
- if await connection.can_read():
+ if await connection.can_read_destructive():
raise ConnectionError("Connection has data") from None
except ConnectionError:
await connection.disconnect()
await connection.connect()
- if await connection.can_read():
+ if await connection.can_read_destructive():
raise ConnectionError("Connection not ready") from None
except BaseException:
# release the connection back to the pool so that we don't leak it