summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_pubsub.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_asyncio/test_pubsub.py')
-rw-r--r--tests/test_asyncio/test_pubsub.py73
1 files changed, 43 insertions, 30 deletions
diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py
index 555cfdb..32268fe 100644
--- a/tests/test_asyncio/test_pubsub.py
+++ b/tests/test_asyncio/test_pubsub.py
@@ -819,7 +819,7 @@ class TestPubSubAutoReconnect:
"type": "subscribe",
}
- async def mycleanup(self):
+ async def myfinish(self):
message = await self.messages.get()
assert message == {
"channel": b"foo",
@@ -827,6 +827,8 @@ class TestPubSubAutoReconnect:
"pattern": None,
"type": "subscribe",
}
+
+ async def mykill(self):
# kill thread
async with self.cond:
self.state = 4 # quit
@@ -836,41 +838,52 @@ class TestPubSubAutoReconnect:
"""
Test that a socket error will cause reconnect
"""
- async with async_timeout.timeout(self.timeout):
- await self.mysetup(r, method)
- # now, disconnect the connection, and wait for it to be re-established
- async with self.cond:
- assert self.state == 0
- self.state = 1
- with mock.patch.object(self.pubsub.connection, "_parser") as mockobj:
- mockobj.read_response.side_effect = socket.error
- mockobj.can_read.side_effect = socket.error
- # wait until task noticies the disconnect until we undo the patch
- await self.cond.wait_for(lambda: self.state >= 2)
- assert not self.pubsub.connection.is_connected
- # it is in a disconnecte state
- # wait for reconnect
- await self.cond.wait_for(lambda: self.pubsub.connection.is_connected)
- assert self.state == 3
+ try:
+ async with async_timeout.timeout(self.timeout):
+ await self.mysetup(r, method)
+ # now, disconnect the connection, and wait for it to be re-established
+ async with self.cond:
+ assert self.state == 0
+ self.state = 1
+ with mock.patch.object(self.pubsub.connection, "_parser") as m:
+ m.read_response.side_effect = socket.error
+ m.can_read.side_effect = socket.error
+ # wait until task noticies the disconnect until we
+ # undo the patch
+ await self.cond.wait_for(lambda: self.state >= 2)
+ assert not self.pubsub.connection.is_connected
+ # it is in a disconnecte state
+ # wait for reconnect
+ await self.cond.wait_for(
+ lambda: self.pubsub.connection.is_connected
+ )
+ assert self.state == 3
- await self.mycleanup()
+ await self.myfinish()
+ finally:
+ await self.mykill()
async def test_reconnect_disconnect(self, r: redis.Redis, method):
"""
Test that a manual disconnect() will cause reconnect
"""
- async with async_timeout.timeout(self.timeout):
- await self.mysetup(r, method)
- # now, disconnect the connection, and wait for it to be re-established
- async with self.cond:
- self.state = 1
- await self.pubsub.connection.disconnect()
- assert not self.pubsub.connection.is_connected
- # wait for reconnect
- await self.cond.wait_for(lambda: self.pubsub.connection.is_connected)
- assert self.state == 3
-
- await self.mycleanup()
+ try:
+ async with async_timeout.timeout(self.timeout):
+ await self.mysetup(r, method)
+ # now, disconnect the connection, and wait for it to be re-established
+ async with self.cond:
+ self.state = 1
+ await self.pubsub.connection.disconnect()
+ assert not self.pubsub.connection.is_connected
+ # wait for reconnect
+ await self.cond.wait_for(
+ lambda: self.pubsub.connection.is_connected
+ )
+ assert self.state == 3
+
+ await self.myfinish()
+ finally:
+ await self.mykill()
async def loop(self):
# reader loop, performing state transitions as it