blob: fa4c2bede3916b20b3973f285a17b14591526c31 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import asyncio
import blinker
def test_send_async():
calls = []
async def receiver_a(sender):
calls.append(receiver_a)
return 'value a'
async def receiver_b(sender):
calls.append(receiver_b)
return 'value b'
def receiver_c(sender):
calls.append(receiver_c)
return 'value c'
sig = blinker.Signal()
sig.connect(receiver_a)
sig.connect(receiver_b)
sig.connect(receiver_c)
async def collect():
return sig.send_async()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(collect())
expected = {
receiver_a: 'value a',
receiver_b: 'value b',
receiver_c: 'value c',
}
assert set(calls) == set(expected.keys())
collected_results = {v.result() for r, v in results}
assert collected_results == set(expected.values())
|