summaryrefslogtreecommitdiff
path: root/tests/test_schedulers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_schedulers.py')
-rw-r--r--tests/test_schedulers.py36
1 files changed, 15 insertions, 21 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32c1c29..30c8dc9 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -26,15 +26,14 @@ from apscheduler import (
SchedulerStopped,
Task,
TaskAdded,
+ current_async_scheduler,
current_job,
current_scheduler,
- current_worker,
)
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
-from apscheduler.workers.async_ import AsyncWorker
if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
@@ -70,7 +69,7 @@ class TestAsyncScheduler:
received_events: list[Event] = []
event = anyio.Event()
trigger = DateTrigger(datetime.now(timezone.utc))
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
scheduler.event_broker.subscribe(listener)
await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
await scheduler.start_in_background()
@@ -111,7 +110,7 @@ class TestAsyncScheduler:
assert not received_events
async def test_add_get_schedule(self) -> None:
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
with pytest.raises(ScheduleLookupError):
await scheduler.get_schedule("dummyid")
@@ -121,7 +120,7 @@ class TestAsyncScheduler:
assert isinstance(schedule, Schedule)
async def test_add_get_schedules(self) -> None:
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
assert await scheduler.get_schedules() == []
schedule1_id = await scheduler.add_schedule(
@@ -161,7 +160,7 @@ class TestAsyncScheduler:
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = anyio.Event()
scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
@@ -263,8 +262,7 @@ class TestAsyncScheduler:
async def test_contextvars(self) -> None:
def check_contextvars() -> None:
- assert current_scheduler.get() is scheduler
- assert isinstance(current_worker.get(), AsyncWorker)
+ assert current_async_scheduler.get() is scheduler
info = current_job.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
@@ -277,7 +275,7 @@ class TestAsyncScheduler:
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
async with AsyncScheduler() as scheduler:
await scheduler.data_store.add_task(
- Task(id="task_id", func=check_contextvars)
+ Task(id="task_id", func=check_contextvars, executor="async")
)
job = Job(
task_id="task_id",
@@ -300,10 +298,7 @@ class TestAsyncScheduler:
async def test_wait_until_stopped(self) -> None:
async with AsyncScheduler() as scheduler:
- trigger = DateTrigger(
- datetime.now(timezone.utc) + timedelta(milliseconds=100)
- )
- await scheduler.add_schedule(scheduler.stop, trigger)
+ await scheduler.add_job(scheduler.stop)
await scheduler.wait_until_stopped()
# This should be a no-op
@@ -422,7 +417,7 @@ class TestSyncScheduler:
# Check that the job was created with the proper amount of jitter in its
# scheduled time
- jobs = scheduler.data_store.get_jobs({job_id})
+ jobs = scheduler._portal.call(scheduler.data_store.get_jobs, {job_id})
assert jobs[0].jitter == timedelta(seconds=jitter)
assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(
seconds=jitter
@@ -495,7 +490,6 @@ class TestSyncScheduler:
def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
- assert current_worker.get() is not None
info = current_job.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
@@ -507,7 +501,10 @@ class TestSyncScheduler:
scheduled_fire_time = datetime.now(timezone.utc)
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
with Scheduler() as scheduler:
- scheduler.data_store.add_task(Task(id="task_id", func=check_contextvars))
+ scheduler._portal.call(
+ scheduler.data_store.add_task,
+ Task(id="task_id", func=check_contextvars, executor="threadpool"),
+ )
job = Job(
task_id="task_id",
schedule_id="foo",
@@ -517,7 +514,7 @@ class TestSyncScheduler:
tags={"foo", "bar"},
result_expiration_time=timedelta(seconds=10),
)
- scheduler.data_store.add_job(job)
+ scheduler._portal.call(scheduler.data_store.add_job, job)
scheduler.start_in_background()
result = scheduler.get_job_result(job.id)
if result.outcome is JobOutcome.error:
@@ -527,10 +524,7 @@ class TestSyncScheduler:
def test_wait_until_stopped(self) -> None:
with Scheduler() as scheduler:
- trigger = DateTrigger(
- datetime.now(timezone.utc) + timedelta(milliseconds=100)
- )
- scheduler.add_schedule(scheduler.stop, trigger)
+ scheduler.add_job(scheduler.stop)
scheduler.start_in_background()
scheduler.wait_until_stopped()