summaryrefslogtreecommitdiff
path: root/tests/test_schedulers.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 01:02:10 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 01:38:09 +0300
commitb4d4724e95583b9f075a814319c3d5e8e5514a3e (patch)
treedd77fb25ded2ceb5a4f29221de69f19f469cfac0 /tests/test_schedulers.py
parentcf77aec5326e42af7b89e4ab2712daf9694ebad9 (diff)
downloadapscheduler-b4d4724e95583b9f075a814319c3d5e8e5514a3e.tar.gz
Overhauled the data store and event dispatch systems
Diffstat (limited to 'tests/test_schedulers.py')
-rw-r--r--tests/test_schedulers.py123
1 files changed, 89 insertions, 34 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 3e125c0..5829f30 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -1,10 +1,14 @@
-import logging
+import threading
from datetime import datetime, timezone
+from typing import List
+import anyio
import pytest
-from apscheduler.events import JobSuccessful
+from anyio import fail_after
+from apscheduler.events import (
+ Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped)
from apscheduler.schedulers.async_ import AsyncScheduler
-from apscheduler.schedulers.sync import SyncScheduler
+from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
pytestmark = pytest.mark.anyio
@@ -19,40 +23,91 @@ def dummy_sync_job():
class TestAsyncScheduler:
- async def test_schedule_job(self, caplog, store):
- async def listener(event):
- events.append(event)
- if isinstance(event, JobSuccessful):
- await scheduler.stop()
+ async def test_schedule_job(self) -> None:
+ def listener(received_event: Event) -> None:
+ received_events.append(received_event)
+ if len(received_events) == 4:
+ event.set()
- caplog.set_level(logging.DEBUG)
+ received_events: List[Event] = []
+ event = anyio.Event()
+ scheduler = AsyncScheduler(start_worker=False)
+ scheduler.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
- events = []
- async with AsyncScheduler(store) as scheduler:
- scheduler.worker.subscribe(listener)
- await scheduler.add_schedule(dummy_async_job, trigger)
- await scheduler.wait_until_stopped()
+ async with scheduler:
+ await scheduler.add_schedule(dummy_async_job, trigger, id='foo')
+ with fail_after(3):
+ await event.wait()
- assert len(events) == 2
- assert isinstance(events[1], JobSuccessful)
- assert events[1].return_value == 'returnvalue'
+ # The scheduler was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
+ # Then a schedule was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'foo'
+ # assert received_event.task_id == 'task_id'
+
+ # Then that schedule was processed and a job was added for it
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == 'test_schedulers:dummy_async_job'
+
+ # Then the schedule was removed since the trigger had been exhausted
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the scheduler was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStopped)
+
+ # There should be no more events on the list
+ assert not received_events
class TestSyncScheduler:
- @pytest.mark.parametrize('anyio_backend', ['asyncio'])
- def test_schedule_job(self, caplog, anyio_backend, sync_store, portal):
- def listener(event):
- events.append(event)
- if isinstance(event, JobSuccessful):
- scheduler.stop()
-
- caplog.set_level(logging.DEBUG)
- events = []
- with SyncScheduler(sync_store, portal=portal) as scheduler:
- scheduler.worker.subscribe(listener)
- scheduler.add_schedule(dummy_sync_job, DateTrigger(datetime.now(timezone.utc)))
- scheduler.wait_until_stopped()
-
- assert len(events) == 2
- assert isinstance(events[1], JobSuccessful)
- assert events[1].return_value == 'returnvalue'
+ def test_schedule_job(self):
+ def listener(received_event: Event) -> None:
+ received_events.append(received_event)
+ if len(received_events) == 4:
+ event.set()
+
+ received_events: List[Event] = []
+ event = threading.Event()
+ scheduler = Scheduler(start_worker=False)
+ scheduler.subscribe(listener)
+ trigger = DateTrigger(datetime.now(timezone.utc))
+ with scheduler:
+ scheduler.add_schedule(dummy_sync_job, trigger, id='foo')
+ event.wait(3)
+
+ # The scheduler was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
+ # Then a schedule was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'foo'
+ # assert received_event.task_id == 'task_id'
+
+ # Then that schedule was processed and a job was added for it
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == 'test_schedulers:dummy_sync_job'
+
+ # Then the schedule was removed since the trigger had been exhausted
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the scheduler was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStopped)
+
+ # There should be no more events on the list
+ assert not received_events