summaryrefslogtreecommitdiff
path: root/tests/test_eventbrokers.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:00:57 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:11:20 +0300
commitb20f62d929eed84ad18020bb82dd43d8cb70da4d (patch)
treec42bf1877dd54755c55c649269e1254995bdf0c9 /tests/test_eventbrokers.py
parent82992cd427a9ab2351d8e0719b82d826dff5a521 (diff)
downloadapscheduler-b20f62d929eed84ad18020bb82dd43d8cb70da4d.tar.gz
Switched to Black for code formatting
Diffstat (limited to 'tests/test_eventbrokers.py')
-rw-r--r--tests/test_eventbrokers.py111
1 files changed, 69 insertions, 42 deletions
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index 1d29ede..7ec90ab 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -33,7 +33,7 @@ def local_async_broker() -> AsyncEventBroker:
def redis_broker(serializer: Serializer) -> EventBroker:
from apscheduler.eventbrokers.redis import RedisEventBroker
- broker = RedisEventBroker.from_url('redis://localhost:6379')
+ broker = RedisEventBroker.from_url("redis://localhost:6379")
broker.serializer = serializer
return broker
@@ -53,29 +53,40 @@ async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
- pool = await create_pool('postgres://postgres:secret@localhost:5432/testdb')
+ pool = await create_pool("postgres://postgres:secret@localhost:5432/testdb")
broker = AsyncpgEventBroker.from_asyncpg_pool(pool)
broker.serializer = serializer
yield broker
await pool.close()
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('local_broker'), id='local'),
- pytest.param(lazy_fixture('redis_broker'), id='redis',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('mqtt_broker'), id='mqtt',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("local_broker"), id="local"),
+ pytest.param(
+ lazy_fixture("redis_broker"),
+ id="redis",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
+ ),
+ ]
+)
def broker(request: SubRequest) -> Callable[[], EventBroker]:
return request.param
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('local_async_broker'), id='local'),
- pytest.param(lazy_fixture('asyncpg_broker'), id='asyncpg',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("local_async_broker"), id="local"),
+ pytest.param(
+ lazy_fixture("asyncpg_broker"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+)
def async_broker(request: SubRequest) -> Callable[[], AsyncEventBroker]:
return request.param
@@ -87,8 +98,9 @@ class TestEventBroker:
broker.subscribe(queue.put_nowait)
broker.subscribe(queue.put_nowait)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
broker.publish(event)
event1 = queue.get(timeout=3)
event2 = queue.get(timeout=1)
@@ -96,27 +108,31 @@ class TestEventBroker:
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == 'schedule1'
- assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ assert event1.schedule_id == "schedule1"
+ assert event1.next_fire_time == datetime(
+ 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
+ )
def test_subscribe_one_shot(self, broker: EventBroker) -> None:
queue: Queue[Event] = Queue()
with broker:
broker.subscribe(queue.put_nowait, one_shot=True)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
broker.publish(event)
event = ScheduleAdded(
- schedule_id='schedule2',
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
broker.publish(event)
received_event = queue.get(timeout=3)
with pytest.raises(Empty):
queue.get(timeout=0.1)
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'schedule1'
+ assert received_event.schedule_id == "schedule1"
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue: Queue[Event] = Queue()
@@ -130,15 +146,19 @@ class TestEventBroker:
with pytest.raises(Empty):
queue.get(timeout=0.1)
- def test_publish_no_subscribers(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def test_publish_no_subscribers(
+ self, broker: EventBroker, caplog: LogCaptureFixture
+ ) -> None:
with broker:
broker.publish(Event())
assert not caplog.text
- def test_publish_exception(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def test_publish_exception(
+ self, broker: EventBroker, caplog: LogCaptureFixture
+ ) -> None:
def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
+ raise Exception("foo")
timestamp = datetime.now(timezone.utc)
event_future: Future[Event] = Future()
@@ -150,7 +170,7 @@ class TestEventBroker:
event = event_future.result(3)
assert isinstance(event, Event)
assert event.timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
+ assert "Error delivering Event" in caplog.text
@pytest.mark.anyio
@@ -161,8 +181,9 @@ class TestAsyncEventBroker:
async_broker.subscribe(send.send)
async_broker.subscribe(send.send_nowait)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
await async_broker.publish(event)
with fail_after(3):
@@ -172,20 +193,24 @@ class TestAsyncEventBroker:
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == 'schedule1'
- assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ assert event1.schedule_id == "schedule1"
+ assert event1.next_fire_time == datetime(
+ 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
+ )
async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream(2)
async with async_broker:
async_broker.subscribe(send.send, one_shot=True)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
await async_broker.publish(event)
event = ScheduleAdded(
- schedule_id='schedule2',
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
await async_broker.publish(event)
with fail_after(3):
@@ -195,7 +220,7 @@ class TestAsyncEventBroker:
await receive.receive()
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'schedule1'
+ assert received_event.schedule_id == "schedule1"
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
@@ -210,17 +235,19 @@ class TestAsyncEventBroker:
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
- async def test_publish_no_subscribers(self, async_broker: AsyncEventBroker,
- caplog: LogCaptureFixture) -> None:
+ async def test_publish_no_subscribers(
+ self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
+ ) -> None:
async with async_broker:
await async_broker.publish(Event())
assert not caplog.text
- async def test_publish_exception(self, async_broker: AsyncEventBroker,
- caplog: LogCaptureFixture) -> None:
+ async def test_publish_exception(
+ self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
+ ) -> None:
def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
+ raise Exception("foo")
timestamp = datetime.now(timezone.utc)
send, receive = create_memory_object_stream()
@@ -231,4 +258,4 @@ class TestAsyncEventBroker:
received_event = await receive.receive()
assert received_event.timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
+ assert "Error delivering Event" in caplog.text