diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-12 22:09:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | c5727432736b55b7d76753307f14efdb962c2edf (patch) | |
tree | 005bd129694b56bd601d65c4cdf43828cfcd4381 /tests/test_schedulers.py | |
parent | 26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff) | |
download | apscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz |
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler
- Removed workers as a user interface
- Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface
- Added the current_async_scheduler contextvar
- Added job executors
Diffstat (limited to 'tests/test_schedulers.py')
-rw-r--r-- | tests/test_schedulers.py | 36 |
1 files changed, 15 insertions, 21 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 32c1c29..30c8dc9 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -26,15 +26,14 @@ from apscheduler import ( SchedulerStopped, Task, TaskAdded, + current_async_scheduler, current_job, current_scheduler, - current_worker, ) from apscheduler.schedulers.async_ import AsyncScheduler from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.workers.async_ import AsyncWorker if sys.version_info >= (3, 9): from zoneinfo import ZoneInfo @@ -70,7 +69,7 @@ class TestAsyncScheduler: received_events: list[Event] = [] event = anyio.Event() trigger = DateTrigger(datetime.now(timezone.utc)) - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: scheduler.event_broker.subscribe(listener) await scheduler.add_schedule(dummy_async_job, trigger, id="foo") await scheduler.start_in_background() @@ -111,7 +110,7 @@ class TestAsyncScheduler: assert not received_events async def test_add_get_schedule(self) -> None: - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: with pytest.raises(ScheduleLookupError): await scheduler.get_schedule("dummyid") @@ -121,7 +120,7 @@ class TestAsyncScheduler: assert isinstance(schedule, Schedule) async def test_add_get_schedules(self) -> None: - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: assert await scheduler.get_schedules() == [] schedule1_id = await scheduler.add_schedule( @@ -161,7 +160,7 @@ class TestAsyncScheduler: orig_start_time = datetime.now(timezone) - timedelta(seconds=1) fake_uniform = mocker.patch("random.uniform") fake_uniform.configure_mock(side_effect=lambda a, b: jitter) - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = anyio.Event() scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) @@ -263,8 +262,7 @@ class TestAsyncScheduler: async def test_contextvars(self) -> None: def check_contextvars() -> None: - assert current_scheduler.get() is scheduler - assert isinstance(current_worker.get(), AsyncWorker) + assert current_async_scheduler.get() is scheduler info = current_job.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" @@ -277,7 +275,7 @@ class TestAsyncScheduler: start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10) async with AsyncScheduler() as scheduler: await scheduler.data_store.add_task( - Task(id="task_id", func=check_contextvars) + Task(id="task_id", func=check_contextvars, executor="async") ) job = Job( task_id="task_id", @@ -300,10 +298,7 @@ class TestAsyncScheduler: async def test_wait_until_stopped(self) -> None: async with AsyncScheduler() as scheduler: - trigger = DateTrigger( - datetime.now(timezone.utc) + timedelta(milliseconds=100) - ) - await scheduler.add_schedule(scheduler.stop, trigger) + await scheduler.add_job(scheduler.stop) await scheduler.wait_until_stopped() # This should be a no-op @@ -422,7 +417,7 @@ class TestSyncScheduler: # Check that the job was created with the proper amount of jitter in its # scheduled time - jobs = scheduler.data_store.get_jobs({job_id}) + jobs = scheduler._portal.call(scheduler.data_store.get_jobs, {job_id}) assert jobs[0].jitter == timedelta(seconds=jitter) assert jobs[0].scheduled_fire_time == orig_start_time + timedelta( seconds=jitter @@ -495,7 +490,6 @@ class TestSyncScheduler: def test_contextvars(self) -> None: def check_contextvars() -> None: assert current_scheduler.get() is scheduler - assert current_worker.get() is not None info = current_job.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" @@ -507,7 +501,10 @@ class TestSyncScheduler: scheduled_fire_time = datetime.now(timezone.utc) start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10) with Scheduler() as scheduler: - scheduler.data_store.add_task(Task(id="task_id", func=check_contextvars)) + scheduler._portal.call( + scheduler.data_store.add_task, + Task(id="task_id", func=check_contextvars, executor="threadpool"), + ) job = Job( task_id="task_id", schedule_id="foo", @@ -517,7 +514,7 @@ class TestSyncScheduler: tags={"foo", "bar"}, result_expiration_time=timedelta(seconds=10), ) - scheduler.data_store.add_job(job) + scheduler._portal.call(scheduler.data_store.add_job, job) scheduler.start_in_background() result = scheduler.get_job_result(job.id) if result.outcome is JobOutcome.error: @@ -527,10 +524,7 @@ class TestSyncScheduler: def test_wait_until_stopped(self) -> None: with Scheduler() as scheduler: - trigger = DateTrigger( - datetime.now(timezone.utc) + timedelta(milliseconds=100) - ) - scheduler.add_schedule(scheduler.stop, trigger) + scheduler.add_job(scheduler.stop) scheduler.start_in_background() scheduler.wait_until_stopped() |