summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 16:11:13 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:44 +0300
commit9568bf2f1297c87ec1b93306b79de925fb2da08e (patch)
tree2969e7951f8883c41f1359ebbc98bcf85a3fdad6 /tests
parent0a6b0f683edee8bf22d85dc655ad61a8285fd312 (diff)
downloadapscheduler-9568bf2f1297c87ec1b93306b79de925fb2da08e.tar.gz
Implemented one-shot event subscriptions
Such subscriptions are delivered the first matching event and then unsubscribed automatically.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_eventbrokers.py157
1 files changed, 53 insertions, 104 deletions
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index 7097001..024b63d 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -79,16 +79,10 @@ def async_broker(request: FixtureRequest) -> Callable[[], AsyncEventBroker]:
class TestEventBroker:
def test_publish_subscribe(self, broker: EventBroker) -> None:
- def subscriber1(event) -> None:
- queue.put_nowait(event)
-
- def subscriber2(event) -> None:
- queue.put_nowait(event)
-
queue = Queue()
with broker:
- broker.subscribe(subscriber1)
- broker.subscribe(subscriber2)
+ broker.subscribe(queue.put_nowait)
+ broker.subscribe(queue.put_nowait)
event = ScheduleAdded(
schedule_id='schedule1',
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
@@ -102,6 +96,25 @@ class TestEventBroker:
assert event1.schedule_id == 'schedule1'
assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ def test_subscribe_one_shot(self, broker: EventBroker) -> None:
+ queue = Queue()
+ with broker:
+ broker.subscribe(queue.put_nowait, one_shot=True)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ broker.publish(event)
+ event = ScheduleAdded(
+ schedule_id='schedule2',
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ broker.publish(event)
+ received_event = queue.get(timeout=3)
+ with pytest.raises(Empty):
+ queue.get(timeout=0.1)
+
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'schedule1'
+
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue = Queue()
with broker:
@@ -140,24 +153,18 @@ class TestEventBroker:
@pytest.mark.anyio
class TestAsyncEventBroker:
async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None:
- def subscriber1(event) -> None:
- send.send_nowait(event)
-
- async def subscriber2(event) -> None:
- await send.send(event)
-
send, receive = create_memory_object_stream(2)
async with async_broker:
- async_broker.subscribe(subscriber1)
- async_broker.subscribe(subscriber2)
+ async_broker.subscribe(send.send)
+ async_broker.subscribe(send.send_nowait)
event = ScheduleAdded(
schedule_id='schedule1',
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
await async_broker.publish(event)
- with fail_after(3):
- event1 = await receive.receive()
- event2 = await receive.receive()
+ with fail_after(3):
+ event1 = await receive.receive()
+ event2 = await receive.receive()
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
@@ -165,6 +172,28 @@ class TestAsyncEventBroker:
assert event1.schedule_id == 'schedule1'
assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None:
+ send, receive = create_memory_object_stream(2)
+ async with async_broker:
+ async_broker.subscribe(send.send, one_shot=True)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ await async_broker.publish(event)
+ event = ScheduleAdded(
+ schedule_id='schedule2',
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ await async_broker.publish(event)
+
+ with fail_after(3):
+ received_event = await receive.receive()
+
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
+
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'schedule1'
+
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
async with async_broker:
@@ -191,92 +220,12 @@ class TestAsyncEventBroker:
raise Exception('foo')
timestamp = datetime.now(timezone.utc)
- events = []
+ send, receive = create_memory_object_stream()
async with async_broker:
async_broker.subscribe(bad_subscriber)
- async_broker.subscribe(events.append)
+ async_broker.subscribe(send.send)
await async_broker.publish(Event(timestamp=timestamp))
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
-#
-# def test_subscribe_coroutine_callback(self) -> None:
-# async def callback(event: Event) -> None:
-# pass
-#
-# with EventBroker() as eventhub:
-# with pytest.raises(ValueError, match='Coroutine functions are not supported'):
-# eventhub.subscribe(callback)
-#
-# def test_relay_events(self) -> None:
-# timestamp = datetime.now(timezone.utc)
-# events = []
-# with EventBroker() as eventhub1, EventBroker() as eventhub2:
-# eventhub2.relay_events_from(eventhub1)
-# eventhub2.subscribe(events.append)
-# eventhub1.publish(Event(timestamp=timestamp))
-#
-# assert isinstance(events[0], Event)
-# assert events[0].timestamp == timestamp
-#
-#
-# @pytest.mark.anyio
-# class TestAsyncEventHub:
-# async def test_publish(self) -> None:
-# async def async_setitem(event: Event) -> None:
-# events[1] = event
-#
-# timestamp = datetime.now(timezone.utc)
-# events: List[Optional[Event]] = [None, None]
-# async with AsyncEventBroker() as eventhub:
-# eventhub.subscribe(partial(setitem, events, 0))
-# eventhub.subscribe(async_setitem)
-# eventhub.publish(Event(timestamp=timestamp))
-#
-# assert events[0] is events[1]
-# assert isinstance(events[0], Event)
-# assert events[0].timestamp == timestamp
-#
-# async def test_unsubscribe(self) -> None:
-# timestamp = datetime.now(timezone.utc)
-# events = []
-# async with AsyncEventBroker() as eventhub:
-# token = eventhub.subscribe(events.append)
-# eventhub.publish(Event(timestamp=timestamp))
-# eventhub.unsubscribe(token)
-# eventhub.publish(Event(timestamp=timestamp))
-#
-# assert len(events) == 1
-#
-# async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
-# async with AsyncEventBroker() as eventhub:
-# eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
-#
-# assert not caplog.text
-#
-# async def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
-# def bad_subscriber(event: Event) -> None:
-# raise Exception('foo')
-#
-# timestamp = datetime.now(timezone.utc)
-# events = []
-# async with AsyncEventBroker() as eventhub:
-# eventhub.subscribe(bad_subscriber)
-# eventhub.subscribe(events.append)
-# eventhub.publish(Event(timestamp=timestamp))
-#
-# assert isinstance(events[0], Event)
-# assert events[0].timestamp == timestamp
-# assert 'Error delivering Event' in caplog.text
-#
-# async def test_relay_events(self) -> None:
-# timestamp = datetime.now(timezone.utc)
-# events = []
-# async with AsyncEventBroker() as eventhub1, AsyncEventBroker() as eventhub2:
-# eventhub1.relay_events_from(eventhub2)
-# eventhub1.subscribe(events.append)
-# eventhub2.publish(Event(timestamp=timestamp))
-#
-# assert isinstance(events[0], Event)
-# assert events[0].timestamp == timestamp
+ received_event = await receive.receive()
+ assert received_event.timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text