summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 21:14:14 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 21:14:14 +0300
commit56afe91d5dc338db3440b2e9ecdea3e522dba30f (patch)
tree311380b0d953f09919d7e8c4c0a340507e5d0dc5 /tests
parent7248a78e7e787b728b083aaa8199eeba3a3f3023 (diff)
downloadapscheduler-56afe91d5dc338db3440b2e9ecdea3e522dba30f.tar.gz
Implemented a pluggable event broker system
Diffstat (limited to 'tests')
-rw-r--r--tests/conftest.py1
-rw-r--r--tests/test_datastores.py4
-rw-r--r--tests/test_eventbrokers.py279
-rw-r--r--tests/test_events.py135
4 files changed, 281 insertions, 138 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index bc5bab2..135c18d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -24,7 +24,6 @@ def timezone() -> ZoneInfo:
@pytest.fixture(params=[
- pytest.param(None, id='none'),
pytest.param(PickleSerializer, id='pickle'),
pytest.param(CBORSerializer, id='cbor'),
pytest.param(JSONSerializer, id='json')
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 52f8349..74db6e7 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -40,11 +40,11 @@ async def capture_events(
events.append(event)
if len(events) == limit:
limit_event.set()
- store.unsubscribe(token)
+ store.events.unsubscribe(token)
events: List[Event] = []
limit_event = anyio.Event()
- token = store.subscribe(listener, event_types)
+ token = store.events.subscribe(listener, event_types)
yield events
if limit:
with anyio.fail_after(3):
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
new file mode 100644
index 0000000..6a0f45d
--- /dev/null
+++ b/tests/test_eventbrokers.py
@@ -0,0 +1,279 @@
+from concurrent.futures import Future
+from datetime import datetime, timezone
+from queue import Empty, Queue
+from typing import Callable
+
+import pytest
+from _pytest.fixtures import FixtureRequest
+from _pytest.logging import LogCaptureFixture
+from anyio import create_memory_object_stream, fail_after
+
+from apscheduler.abc import AsyncEventBroker, EventBroker, Serializer
+from apscheduler.events import Event, ScheduleAdded
+
+
+@pytest.fixture
+def local_broker() -> EventBroker:
+ from apscheduler.eventbrokers.local import LocalEventBroker
+
+ return LocalEventBroker()
+
+
+@pytest.fixture
+def local_async_broker() -> AsyncEventBroker:
+ from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
+
+ return LocalAsyncEventBroker()
+
+
+@pytest.fixture
+def redis_broker(serializer: Serializer) -> EventBroker:
+ from apscheduler.eventbrokers.redis import RedisEventBroker
+
+ broker = RedisEventBroker.from_url('redis://localhost:6379')
+ broker.serializer = serializer
+ return broker
+
+
+@pytest.fixture
+def mqtt_broker(serializer: Serializer) -> EventBroker:
+ from paho.mqtt.client import Client
+
+ from apscheduler.eventbrokers.mqtt import MQTTEventBroker
+
+ return MQTTEventBroker(Client(), serializer=serializer)
+
+
+@pytest.fixture
+async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
+ from asyncpg import create_pool
+
+ from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
+
+ pool = await create_pool('postgres://postgres:secret@localhost:5432/testdb')
+ broker = AsyncpgEventBroker.from_asyncpg_pool(pool)
+ broker.serializer = serializer
+ yield broker
+ await pool.close()
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('local_broker'), id='local'),
+ pytest.param(pytest.lazy_fixture('redis_broker'), id='redis'),
+ pytest.param(pytest.lazy_fixture('mqtt_broker'), id='mqtt')
+])
+def broker(request: FixtureRequest) -> Callable[[], EventBroker]:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('local_async_broker'), id='local'),
+ pytest.param(pytest.lazy_fixture('asyncpg_broker'), id='asyncpg')
+])
+def async_broker(request: FixtureRequest) -> Callable[[], AsyncEventBroker]:
+ return request.param
+
+
+class TestEventBroker:
+ def test_publish_subscribe(self, broker: EventBroker) -> None:
+ def subscriber1(event) -> None:
+ queue.put_nowait(event)
+
+ def subscriber2(event) -> None:
+ queue.put_nowait(event)
+
+ queue = Queue()
+ with broker:
+ broker.subscribe(subscriber1)
+ broker.subscribe(subscriber2)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ broker.publish(event)
+ event1 = queue.get(timeout=3)
+ event2 = queue.get(timeout=1)
+
+ assert event1 == event2
+ assert isinstance(event1, ScheduleAdded)
+ assert isinstance(event1.timestamp, datetime)
+ assert event1.schedule_id == 'schedule1'
+ assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+
+ def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
+ queue = Queue()
+ with broker:
+ token = broker.subscribe(queue.put_nowait)
+ broker.publish(Event())
+ queue.get(timeout=3)
+
+ broker.unsubscribe(token)
+ broker.publish(Event())
+ with pytest.raises(Empty):
+ queue.get(timeout=0.1)
+
+ def test_publish_no_subscribers(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ with broker:
+ broker.publish(Event())
+
+ assert not caplog.text
+
+ def test_publish_exception(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ event_future: Future[Event] = Future()
+ with broker:
+ broker.subscribe(bad_subscriber)
+ broker.subscribe(event_future.set_result)
+ broker.publish(Event(timestamp=timestamp))
+
+ event = event_future.result(3)
+ assert isinstance(event, Event)
+ assert event.timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+
+
+@pytest.mark.anyio
+class TestAsyncEventBroker:
+ async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None:
+ def subscriber1(event) -> None:
+ send.send_nowait(event)
+
+ async def subscriber2(event) -> None:
+ await send.send(event)
+
+ send, receive = create_memory_object_stream(2)
+ async with async_broker:
+ async_broker.subscribe(subscriber1)
+ async_broker.subscribe(subscriber2)
+ event = ScheduleAdded(
+ schedule_id='schedule1',
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ await async_broker.publish(event)
+
+ with fail_after(3):
+ event1 = await receive.receive()
+ event2 = await receive.receive()
+
+ assert event1 == event2
+ assert isinstance(event1, ScheduleAdded)
+ assert isinstance(event1.timestamp, datetime)
+ assert event1.schedule_id == 'schedule1'
+ assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+
+ async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
+ send, receive = create_memory_object_stream()
+ async with async_broker:
+ token = async_broker.subscribe(send.send)
+ await async_broker.publish(Event())
+ with fail_after(3):
+ await receive.receive()
+
+ async_broker.unsubscribe(token)
+ await async_broker.publish(Event())
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
+
+ async def test_publish_no_subscribers(self, async_broker: AsyncEventBroker,
+ caplog: LogCaptureFixture) -> None:
+ async with async_broker:
+ await async_broker.publish(Event())
+
+ assert not caplog.text
+
+ async def test_publish_exception(self, async_broker: AsyncEventBroker,
+ caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ async with async_broker:
+ async_broker.subscribe(bad_subscriber)
+ async_broker.subscribe(events.append)
+ await async_broker.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+#
+# def test_subscribe_coroutine_callback(self) -> None:
+# async def callback(event: Event) -> None:
+# pass
+#
+# with EventBroker() as eventhub:
+# with pytest.raises(ValueError, match='Coroutine functions are not supported'):
+# eventhub.subscribe(callback)
+#
+# def test_relay_events(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# with EventBroker() as eventhub1, EventBroker() as eventhub2:
+# eventhub2.relay_events_from(eventhub1)
+# eventhub2.subscribe(events.append)
+# eventhub1.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+#
+#
+# @pytest.mark.anyio
+# class TestAsyncEventHub:
+# async def test_publish(self) -> None:
+# async def async_setitem(event: Event) -> None:
+# events[1] = event
+#
+# timestamp = datetime.now(timezone.utc)
+# events: List[Optional[Event]] = [None, None]
+# async with AsyncEventBroker() as eventhub:
+# eventhub.subscribe(partial(setitem, events, 0))
+# eventhub.subscribe(async_setitem)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert events[0] is events[1]
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+#
+# async def test_unsubscribe(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub:
+# token = eventhub.subscribe(events.append)
+# eventhub.publish(Event(timestamp=timestamp))
+# eventhub.unsubscribe(token)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert len(events) == 1
+#
+# async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
+# async with AsyncEventBroker() as eventhub:
+# eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
+#
+# assert not caplog.text
+#
+# async def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
+# def bad_subscriber(event: Event) -> None:
+# raise Exception('foo')
+#
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub:
+# eventhub.subscribe(bad_subscriber)
+# eventhub.subscribe(events.append)
+# eventhub.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
+# assert 'Error delivering Event' in caplog.text
+#
+# async def test_relay_events(self) -> None:
+# timestamp = datetime.now(timezone.utc)
+# events = []
+# async with AsyncEventBroker() as eventhub1, AsyncEventBroker() as eventhub2:
+# eventhub1.relay_events_from(eventhub2)
+# eventhub1.subscribe(events.append)
+# eventhub2.publish(Event(timestamp=timestamp))
+#
+# assert isinstance(events[0], Event)
+# assert events[0].timestamp == timestamp
diff --git a/tests/test_events.py b/tests/test_events.py
deleted file mode 100644
index bbe344f..0000000
--- a/tests/test_events.py
+++ /dev/null
@@ -1,135 +0,0 @@
-from datetime import datetime, timezone
-from functools import partial
-from operator import setitem
-from typing import List, Optional
-
-import pytest
-from _pytest.logging import LogCaptureFixture
-
-from apscheduler.events import AsyncEventHub, Event, EventHub
-
-
-class TestEventHub:
- def test_publish(self) -> None:
- timestamp = datetime.now(timezone.utc)
- events: List[Optional[Event]] = [None, None]
- with EventHub() as eventhub:
- eventhub.subscribe(partial(setitem, events, 0))
- eventhub.subscribe(partial(setitem, events, 1))
- eventhub.publish(Event(timestamp=timestamp))
-
- assert events[0] is events[1]
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
-
- def test_unsubscribe(self) -> None:
- timestamp = datetime.now(timezone.utc)
- events = []
- with EventHub() as eventhub:
- token = eventhub.subscribe(events.append)
- eventhub.publish(Event(timestamp=timestamp))
- eventhub.unsubscribe(token)
- eventhub.publish(Event(timestamp=timestamp))
-
- assert len(events) == 1
-
- def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
- with EventHub() as eventhub:
- eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
-
- assert not caplog.text
-
- def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
- def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
-
- timestamp = datetime.now(timezone.utc)
- events = []
- with EventHub() as eventhub:
- eventhub.subscribe(bad_subscriber)
- eventhub.subscribe(events.append)
- eventhub.publish(Event(timestamp=timestamp))
-
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
-
- def test_subscribe_coroutine_callback(self) -> None:
- async def callback(event: Event) -> None:
- pass
-
- with EventHub() as eventhub:
- with pytest.raises(ValueError, match='Coroutine functions are not supported'):
- eventhub.subscribe(callback)
-
- def test_relay_events(self) -> None:
- timestamp = datetime.now(timezone.utc)
- events = []
- with EventHub() as eventhub1, EventHub() as eventhub2:
- eventhub2.relay_events_from(eventhub1)
- eventhub2.subscribe(events.append)
- eventhub1.publish(Event(timestamp=timestamp))
-
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
-
-
-@pytest.mark.anyio
-class TestAsyncEventHub:
- async def test_publish(self) -> None:
- async def async_setitem(event: Event) -> None:
- events[1] = event
-
- timestamp = datetime.now(timezone.utc)
- events: List[Optional[Event]] = [None, None]
- async with AsyncEventHub() as eventhub:
- eventhub.subscribe(partial(setitem, events, 0))
- eventhub.subscribe(async_setitem)
- eventhub.publish(Event(timestamp=timestamp))
-
- assert events[0] is events[1]
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
-
- async def test_unsubscribe(self) -> None:
- timestamp = datetime.now(timezone.utc)
- events = []
- async with AsyncEventHub() as eventhub:
- token = eventhub.subscribe(events.append)
- eventhub.publish(Event(timestamp=timestamp))
- eventhub.unsubscribe(token)
- eventhub.publish(Event(timestamp=timestamp))
-
- assert len(events) == 1
-
- async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
- async with AsyncEventHub() as eventhub:
- eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
-
- assert not caplog.text
-
- async def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
- def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
-
- timestamp = datetime.now(timezone.utc)
- events = []
- async with AsyncEventHub() as eventhub:
- eventhub.subscribe(bad_subscriber)
- eventhub.subscribe(events.append)
- eventhub.publish(Event(timestamp=timestamp))
-
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
-
- async def test_relay_events(self) -> None:
- timestamp = datetime.now(timezone.utc)
- events = []
- async with AsyncEventHub() as eventhub1, AsyncEventHub() as eventhub2:
- eventhub1.relay_events_from(eventhub2)
- eventhub1.subscribe(events.append)
- eventhub2.publish(Event(timestamp=timestamp))
-
- assert isinstance(events[0], Event)
- assert events[0].timestamp == timestamp