diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-29 01:02:10 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-29 01:38:09 +0300 |
commit | b4d4724e95583b9f075a814319c3d5e8e5514a3e (patch) | |
tree | dd77fb25ded2ceb5a4f29221de69f19f469cfac0 /tests/test_schedulers.py | |
parent | cf77aec5326e42af7b89e4ab2712daf9694ebad9 (diff) | |
download | apscheduler-b4d4724e95583b9f075a814319c3d5e8e5514a3e.tar.gz |
Overhauled the data store and event dispatch systems
Diffstat (limited to 'tests/test_schedulers.py')
-rw-r--r-- | tests/test_schedulers.py | 123 |
1 files changed, 89 insertions, 34 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 3e125c0..5829f30 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,10 +1,14 @@ -import logging +import threading from datetime import datetime, timezone +from typing import List +import anyio import pytest -from apscheduler.events import JobSuccessful +from anyio import fail_after +from apscheduler.events import ( + Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped) from apscheduler.schedulers.async_ import AsyncScheduler -from apscheduler.schedulers.sync import SyncScheduler +from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger pytestmark = pytest.mark.anyio @@ -19,40 +23,91 @@ def dummy_sync_job(): class TestAsyncScheduler: - async def test_schedule_job(self, caplog, store): - async def listener(event): - events.append(event) - if isinstance(event, JobSuccessful): - await scheduler.stop() + async def test_schedule_job(self) -> None: + def listener(received_event: Event) -> None: + received_events.append(received_event) + if len(received_events) == 4: + event.set() - caplog.set_level(logging.DEBUG) + received_events: List[Event] = [] + event = anyio.Event() + scheduler = AsyncScheduler(start_worker=False) + scheduler.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) - events = [] - async with AsyncScheduler(store) as scheduler: - scheduler.worker.subscribe(listener) - await scheduler.add_schedule(dummy_async_job, trigger) - await scheduler.wait_until_stopped() + async with scheduler: + await scheduler.add_schedule(dummy_async_job, trigger, id='foo') + with fail_after(3): + await event.wait() - assert len(events) == 2 - assert isinstance(events[1], JobSuccessful) - assert events[1].return_value == 'returnvalue' + # The scheduler was first started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + + # Then a schedule was added + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'foo' + # assert received_event.task_id == 'task_id' + + # Then that schedule was processed and a job was added for it + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.schedule_id == 'foo' + assert received_event.task_id == 'test_schedulers:dummy_async_job' + + # Then the schedule was removed since the trigger had been exhausted + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == 'foo' + + # Finally, the scheduler was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStopped) + + # There should be no more events on the list + assert not received_events class TestSyncScheduler: - @pytest.mark.parametrize('anyio_backend', ['asyncio']) - def test_schedule_job(self, caplog, anyio_backend, sync_store, portal): - def listener(event): - events.append(event) - if isinstance(event, JobSuccessful): - scheduler.stop() - - caplog.set_level(logging.DEBUG) - events = [] - with SyncScheduler(sync_store, portal=portal) as scheduler: - scheduler.worker.subscribe(listener) - scheduler.add_schedule(dummy_sync_job, DateTrigger(datetime.now(timezone.utc))) - scheduler.wait_until_stopped() - - assert len(events) == 2 - assert isinstance(events[1], JobSuccessful) - assert events[1].return_value == 'returnvalue' + def test_schedule_job(self): + def listener(received_event: Event) -> None: + received_events.append(received_event) + if len(received_events) == 4: + event.set() + + received_events: List[Event] = [] + event = threading.Event() + scheduler = Scheduler(start_worker=False) + scheduler.subscribe(listener) + trigger = DateTrigger(datetime.now(timezone.utc)) + with scheduler: + scheduler.add_schedule(dummy_sync_job, trigger, id='foo') + event.wait(3) + + # The scheduler was first started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + + # Then a schedule was added + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'foo' + # assert received_event.task_id == 'task_id' + + # Then that schedule was processed and a job was added for it + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.schedule_id == 'foo' + assert received_event.task_id == 'test_schedulers:dummy_sync_job' + + # Then the schedule was removed since the trigger had been exhausted + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == 'foo' + + # Finally, the scheduler was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStopped) + + # There should be no more events on the list + assert not received_events |