1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
from __future__ import annotations
from contextlib import AsyncExitStack
from datetime import datetime, timezone
import pytest
from _pytest.logging import LogCaptureFixture
from anyio import CancelScope, create_memory_object_stream, fail_after
from apscheduler import Event, ScheduleAdded
from apscheduler.abc import EventBroker
pytestmark = pytest.mark.anyio
async def test_publish_subscribe(event_broker: EventBroker) -> None:
send, receive = create_memory_object_stream(2)
event_broker.subscribe(send.send)
event_broker.subscribe(send.send_nowait)
event = ScheduleAdded(
schedule_id="schedule1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
)
await event_broker.publish(event)
with fail_after(3):
event1 = await receive.receive()
event2 = await receive.receive()
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
assert event1.schedule_id == "schedule1"
assert event1.next_fire_time == datetime(
2021, 9, 11, 12, 31, 56, 254867, timezone.utc
)
async def test_subscribe_one_shot(event_broker: EventBroker) -> None:
send, receive = create_memory_object_stream(2)
event_broker.subscribe(send.send, one_shot=True)
event = ScheduleAdded(
schedule_id="schedule1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
)
await event_broker.publish(event)
event = ScheduleAdded(
schedule_id="schedule2",
next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
)
await event_broker.publish(event)
with fail_after(3):
received_event = await receive.receive()
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == "schedule1"
async def test_unsubscribe(event_broker: EventBroker) -> None:
send, receive = create_memory_object_stream()
subscription = event_broker.subscribe(send.send)
await event_broker.publish(Event())
with fail_after(3):
await receive.receive()
subscription.unsubscribe()
await event_broker.publish(Event())
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
async def test_publish_no_subscribers(event_broker, caplog: LogCaptureFixture) -> None:
await event_broker.publish(Event())
assert not caplog.text
async def test_publish_exception(event_broker, caplog: LogCaptureFixture) -> None:
def bad_subscriber(event: Event) -> None:
raise Exception("foo")
timestamp = datetime.now(timezone.utc)
send, receive = create_memory_object_stream()
event_broker.subscribe(bad_subscriber)
event_broker.subscribe(send.send)
await event_broker.publish(Event(timestamp=timestamp))
received_event = await receive.receive()
assert received_event.timestamp == timestamp
assert "Error delivering Event" in caplog.text
async def test_cancel_start(raw_event_broker: EventBroker) -> None:
with CancelScope() as scope:
scope.cancel()
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack)
async def test_cancel_stop(raw_event_broker: EventBroker) -> None:
with CancelScope() as scope:
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack)
scope.cancel()
|