diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_eventbrokers.py | 157 |
1 files changed, 53 insertions, 104 deletions
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index 7097001..024b63d 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -79,16 +79,10 @@ def async_broker(request: FixtureRequest) -> Callable[[], AsyncEventBroker]: class TestEventBroker: def test_publish_subscribe(self, broker: EventBroker) -> None: - def subscriber1(event) -> None: - queue.put_nowait(event) - - def subscriber2(event) -> None: - queue.put_nowait(event) - queue = Queue() with broker: - broker.subscribe(subscriber1) - broker.subscribe(subscriber2) + broker.subscribe(queue.put_nowait) + broker.subscribe(queue.put_nowait) event = ScheduleAdded( schedule_id='schedule1', next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) @@ -102,6 +96,25 @@ class TestEventBroker: assert event1.schedule_id == 'schedule1' assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc) + def test_subscribe_one_shot(self, broker: EventBroker) -> None: + queue = Queue() + with broker: + broker.subscribe(queue.put_nowait, one_shot=True) + event = ScheduleAdded( + schedule_id='schedule1', + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) + broker.publish(event) + event = ScheduleAdded( + schedule_id='schedule2', + next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc)) + broker.publish(event) + received_event = queue.get(timeout=3) + with pytest.raises(Empty): + queue.get(timeout=0.1) + + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'schedule1' + def test_unsubscribe(self, broker: EventBroker, caplog) -> None: queue = Queue() with broker: @@ -140,24 +153,18 @@ class TestEventBroker: @pytest.mark.anyio class TestAsyncEventBroker: async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None: - def subscriber1(event) -> None: - send.send_nowait(event) - - async def subscriber2(event) -> None: - await send.send(event) - send, receive = create_memory_object_stream(2) async with async_broker: - async_broker.subscribe(subscriber1) - async_broker.subscribe(subscriber2) + async_broker.subscribe(send.send) + async_broker.subscribe(send.send_nowait) event = ScheduleAdded( schedule_id='schedule1', next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) await async_broker.publish(event) - with fail_after(3): - event1 = await receive.receive() - event2 = await receive.receive() + with fail_after(3): + event1 = await receive.receive() + event2 = await receive.receive() assert event1 == event2 assert isinstance(event1, ScheduleAdded) @@ -165,6 +172,28 @@ class TestAsyncEventBroker: assert event1.schedule_id == 'schedule1' assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc) + async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None: + send, receive = create_memory_object_stream(2) + async with async_broker: + async_broker.subscribe(send.send, one_shot=True) + event = ScheduleAdded( + schedule_id='schedule1', + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)) + await async_broker.publish(event) + event = ScheduleAdded( + schedule_id='schedule2', + next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc)) + await async_broker.publish(event) + + with fail_after(3): + received_event = await receive.receive() + + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() + + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'schedule1' + async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream() async with async_broker: @@ -191,92 +220,12 @@ class TestAsyncEventBroker: raise Exception('foo') timestamp = datetime.now(timezone.utc) - events = [] + send, receive = create_memory_object_stream() async with async_broker: async_broker.subscribe(bad_subscriber) - async_broker.subscribe(events.append) + async_broker.subscribe(send.send) await async_broker.publish(Event(timestamp=timestamp)) - assert isinstance(events[0], Event) - assert events[0].timestamp == timestamp - assert 'Error delivering Event' in caplog.text -# -# def test_subscribe_coroutine_callback(self) -> None: -# async def callback(event: Event) -> None: -# pass -# -# with EventBroker() as eventhub: -# with pytest.raises(ValueError, match='Coroutine functions are not supported'): -# eventhub.subscribe(callback) -# -# def test_relay_events(self) -> None: -# timestamp = datetime.now(timezone.utc) -# events = [] -# with EventBroker() as eventhub1, EventBroker() as eventhub2: -# eventhub2.relay_events_from(eventhub1) -# eventhub2.subscribe(events.append) -# eventhub1.publish(Event(timestamp=timestamp)) -# -# assert isinstance(events[0], Event) -# assert events[0].timestamp == timestamp -# -# -# @pytest.mark.anyio -# class TestAsyncEventHub: -# async def test_publish(self) -> None: -# async def async_setitem(event: Event) -> None: -# events[1] = event -# -# timestamp = datetime.now(timezone.utc) -# events: List[Optional[Event]] = [None, None] -# async with AsyncEventBroker() as eventhub: -# eventhub.subscribe(partial(setitem, events, 0)) -# eventhub.subscribe(async_setitem) -# eventhub.publish(Event(timestamp=timestamp)) -# -# assert events[0] is events[1] -# assert isinstance(events[0], Event) -# assert events[0].timestamp == timestamp -# -# async def test_unsubscribe(self) -> None: -# timestamp = datetime.now(timezone.utc) -# events = [] -# async with AsyncEventBroker() as eventhub: -# token = eventhub.subscribe(events.append) -# eventhub.publish(Event(timestamp=timestamp)) -# eventhub.unsubscribe(token) -# eventhub.publish(Event(timestamp=timestamp)) -# -# assert len(events) == 1 -# -# async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: -# async with AsyncEventBroker() as eventhub: -# eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) -# -# assert not caplog.text -# -# async def test_publish_exception(self, caplog: LogCaptureFixture) -> None: -# def bad_subscriber(event: Event) -> None: -# raise Exception('foo') -# -# timestamp = datetime.now(timezone.utc) -# events = [] -# async with AsyncEventBroker() as eventhub: -# eventhub.subscribe(bad_subscriber) -# eventhub.subscribe(events.append) -# eventhub.publish(Event(timestamp=timestamp)) -# -# assert isinstance(events[0], Event) -# assert events[0].timestamp == timestamp -# assert 'Error delivering Event' in caplog.text -# -# async def test_relay_events(self) -> None: -# timestamp = datetime.now(timezone.utc) -# events = [] -# async with AsyncEventBroker() as eventhub1, AsyncEventBroker() as eventhub2: -# eventhub1.relay_events_from(eventhub2) -# eventhub1.subscribe(events.append) -# eventhub2.publish(Event(timestamp=timestamp)) -# -# assert isinstance(events[0], Event) -# assert events[0].timestamp == timestamp + received_event = await receive.receive() + assert received_event.timestamp == timestamp + assert 'Error delivering Event' in caplog.text |