summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_pubsub.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_asyncio/test_pubsub.py')
-rw-r--r--tests/test_asyncio/test_pubsub.py33
1 files changed, 33 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py
index 9efcd3c..f71ec7e 100644
--- a/tests/test_asyncio/test_pubsub.py
+++ b/tests/test_asyncio/test_pubsub.py
@@ -2,6 +2,7 @@ import asyncio
import sys
from typing import Optional
+import async_timeout
import pytest
if sys.version_info[0:2] == (3, 6):
@@ -658,3 +659,35 @@ class TestPubSubRun:
except asyncio.CancelledError:
pass
assert str(e) == "error"
+
+ async def test_late_subscribe(self, r: redis.Redis):
+ def callback(message):
+ messages.put_nowait(message)
+
+ messages = asyncio.Queue()
+ p = r.pubsub()
+ task = asyncio.get_event_loop().create_task(p.run())
+ # wait until loop gets settled. Add a subscription
+ await asyncio.sleep(0.1)
+ await p.subscribe(foo=callback)
+ # wait tof the subscribe to finish. Cannot use _subscribe() because
+ # p.run() is already accepting messages
+ await asyncio.sleep(0.1)
+ await r.publish("foo", "bar")
+ message = None
+ try:
+ async with async_timeout.timeout(0.1):
+ message = await messages.get()
+ except asyncio.TimeoutError:
+ pass
+ task.cancel()
+ # we expect a cancelled error, not the Runtime error
+ # ("did you forget to call subscribe()"")
+ with pytest.raises(asyncio.CancelledError):
+ await task
+ assert message == {
+ "channel": b"foo",
+ "data": b"bar",
+ "pattern": None,
+ "type": "message",
+ }