summaryrefslogtreecommitdiff
path: root/tests/test_eventbrokers.py
blob: 9b98f60014bad4c9457cbe494674734dba75ae72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from __future__ import annotations

from contextlib import AsyncExitStack
from datetime import datetime, timezone

import pytest
from _pytest.logging import LogCaptureFixture
from anyio import CancelScope, create_memory_object_stream, fail_after

from apscheduler import Event, ScheduleAdded
from apscheduler.abc import EventBroker

pytestmark = pytest.mark.anyio


async def test_publish_subscribe(event_broker: EventBroker) -> None:
    send, receive = create_memory_object_stream(2)
    event_broker.subscribe(send.send)
    event_broker.subscribe(send.send_nowait)
    event = ScheduleAdded(
        schedule_id="schedule1",
        next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
    )
    await event_broker.publish(event)

    with fail_after(3):
        event1 = await receive.receive()
        event2 = await receive.receive()

    assert event1 == event2
    assert isinstance(event1, ScheduleAdded)
    assert isinstance(event1.timestamp, datetime)
    assert event1.schedule_id == "schedule1"
    assert event1.next_fire_time == datetime(
        2021, 9, 11, 12, 31, 56, 254867, timezone.utc
    )


async def test_subscribe_one_shot(event_broker: EventBroker) -> None:
    send, receive = create_memory_object_stream(2)
    event_broker.subscribe(send.send, one_shot=True)
    event = ScheduleAdded(
        schedule_id="schedule1",
        next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
    )
    await event_broker.publish(event)
    event = ScheduleAdded(
        schedule_id="schedule2",
        next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
    )
    await event_broker.publish(event)

    with fail_after(3):
        received_event = await receive.receive()

    with pytest.raises(TimeoutError), fail_after(0.1):
        await receive.receive()

    assert isinstance(received_event, ScheduleAdded)
    assert received_event.schedule_id == "schedule1"


async def test_unsubscribe(event_broker: EventBroker) -> None:
    send, receive = create_memory_object_stream()
    subscription = event_broker.subscribe(send.send)
    await event_broker.publish(Event())
    with fail_after(3):
        await receive.receive()

    subscription.unsubscribe()
    await event_broker.publish(Event())
    with pytest.raises(TimeoutError), fail_after(0.1):
        await receive.receive()


async def test_publish_no_subscribers(event_broker, caplog: LogCaptureFixture) -> None:
    await event_broker.publish(Event())
    assert not caplog.text


async def test_publish_exception(event_broker, caplog: LogCaptureFixture) -> None:
    def bad_subscriber(event: Event) -> None:
        raise Exception("foo")

    timestamp = datetime.now(timezone.utc)
    send, receive = create_memory_object_stream()
    event_broker.subscribe(bad_subscriber)
    event_broker.subscribe(send.send)
    await event_broker.publish(Event(timestamp=timestamp))

    received_event = await receive.receive()
    assert received_event.timestamp == timestamp
    assert "Error delivering Event" in caplog.text


async def test_cancel_start(raw_event_broker: EventBroker) -> None:
    with CancelScope() as scope:
        scope.cancel()
        async with AsyncExitStack() as exit_stack:
            await raw_event_broker.start(exit_stack)


async def test_cancel_stop(raw_event_broker: EventBroker) -> None:
    with CancelScope() as scope:
        async with AsyncExitStack() as exit_stack:
            await raw_event_broker.start(exit_stack)
            scope.cancel()