From f215c1ab45959095f6b499eb7b26356c5937ee8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 9 Jun 2022 12:40:55 +0300 Subject: Added support for starting the sync scheduler (and worker) without the context manager --- tests/test_datastores.py | 947 +++++++++++++++++++++++++++++++++------------ tests/test_eventbrokers.py | 189 +++++---- tests/test_schedulers.py | 40 +- tests/test_workers.py | 63 +-- 4 files changed, 820 insertions(+), 419 deletions(-) (limited to 'tests') diff --git a/tests/test_datastores.py b/tests/test_datastores.py index d6376ea..ea5416e 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,19 +1,31 @@ from __future__ import annotations -from contextlib import asynccontextmanager +import threading +from collections.abc import Generator +from contextlib import asynccontextmanager, contextmanager from datetime import datetime, timezone from tempfile import TemporaryDirectory -from typing import AsyncGenerator +from typing import Any, AsyncGenerator, cast import anyio import pytest +from _pytest.fixtures import SubRequest from freezegun.api import FrozenDateTimeFactory from pytest_lazyfixture import lazy_fixture -from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule +from apscheduler.abc import ( + AsyncDataStore, + AsyncEventBroker, + DataStore, + EventBroker, + Job, + Schedule, +) from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter from apscheduler.datastores.memory import MemoryDataStore from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome +from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker +from apscheduler.eventbrokers.local import LocalEventBroker from apscheduler.events import ( Event, ScheduleAdded, @@ -31,6 +43,12 @@ def memory_store() -> DataStore: yield MemoryDataStore() +@pytest.fixture +def adapted_memory_store() -> AsyncDataStore: + store = MemoryDataStore() + return AsyncDataStoreAdapter(store) + + @pytest.fixture def mongodb_store() -> DataStore: from pymongo import MongoClient @@ -96,39 +114,6 @@ async def asyncpg_store() -> AsyncDataStore: await engine.dispose() -@pytest.fixture( - params=[ - pytest.param(lazy_fixture("memory_store"), id="memory"), - pytest.param(lazy_fixture("sqlite_store"), id="sqlite"), - pytest.param( - lazy_fixture("mongodb_store"), - id="mongodb", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("psycopg2_store"), - id="psycopg2", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("asyncpg_store"), - id="asyncpg", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("mysql_store"), - id="mysql", - marks=[pytest.mark.external_service], - ), - ] -) -async def datastore(request): - if isinstance(request.param, DataStore): - return AsyncDataStoreAdapter(request.param) - else: - return request.param - - @pytest.fixture def schedules() -> list[Schedule]: trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) @@ -144,32 +129,504 @@ def schedules() -> list[Schedule]: return [schedule1, schedule2, schedule3] -@asynccontextmanager -async def capture_events( - datastore: AsyncDataStore, limit: int, event_types: set[type[Event]] | None = None -) -> AsyncGenerator[list[Event], None]: - def listener(event: Event) -> None: - events.append(event) - if len(events) == limit: - limit_event.set() - subscription.unsubscribe() +class TestDataStores: + @contextmanager + def capture_events( + self, + datastore: DataStore, + limit: int, + event_types: set[type[Event]] | None = None, + ) -> Generator[list[Event], None, None]: + def listener(event: Event) -> None: + events.append(event) + if len(events) == limit: + limit_event.set() + subscription.unsubscribe() + + events: list[Event] = [] + limit_event = threading.Event() + subscription = datastore.events.subscribe(listener, event_types) + yield events + if limit: + limit_event.wait(2) + + @pytest.fixture + def event_broker(self) -> Generator[EventBroker, Any, None]: + broker = LocalEventBroker() + broker.start() + yield broker + broker.stop() + + @pytest.fixture( + params=[ + pytest.param(lazy_fixture("memory_store"), id="memory"), + pytest.param(lazy_fixture("sqlite_store"), id="sqlite"), + pytest.param( + lazy_fixture("mongodb_store"), + id="mongodb", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("psycopg2_store"), + id="psycopg2", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("mysql_store"), + id="mysql", + marks=[pytest.mark.external_service], + ), + ] + ) + def datastore( + self, request: SubRequest, event_broker: EventBroker + ) -> Generator[DataStore, Any, None]: + datastore = cast(DataStore, request.param) + datastore.start(event_broker) + yield datastore + datastore.stop() + + def test_add_replace_task(self, datastore: DataStore) -> None: + import math - events: list[Event] = [] - limit_event = anyio.Event() - subscription = datastore.events.subscribe(listener, event_types) - yield events - if limit: - with anyio.fail_after(3): - await limit_event.wait() + event_types = {TaskAdded, TaskUpdated} + with self.capture_events(datastore, 3, event_types) as events: + datastore.add_task(Task(id="test_task", func=print)) + datastore.add_task(Task(id="test_task2", func=math.ceil)) + datastore.add_task(Task(id="test_task", func=repr)) + + tasks = datastore.get_tasks() + assert len(tasks) == 2 + assert tasks[0].id == "test_task" + assert tasks[0].func is repr + assert tasks[1].id == "test_task2" + assert tasks[1].func is math.ceil + + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task" + + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task2" + + received_event = events.pop(0) + assert isinstance(received_event, TaskUpdated) + assert received_event.task_id == "test_task" + + assert not events + + def test_add_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 3, {ScheduleAdded}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + assert datastore.get_schedules() == schedules + assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules + assert datastore.get_schedules({"s1"}) == [schedules[0]] + assert datastore.get_schedules({"s2"}) == [schedules[1]] + assert datastore.get_schedules({"s3"}) == [schedules[2]] + + for event, schedule in zip(events, schedules): + assert event.schedule_id == schedule.id + assert event.next_fire_time == schedule.next_fire_time + + def test_replace_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + next_fire_time = schedules[2].trigger.next() + schedule = Schedule( + id="s3", + task_id="foo", + trigger=schedules[2].trigger, + args=(), + kwargs={}, + coalesce=CoalescePolicy.earliest, + misfire_grace_time=None, + tags=frozenset(), + ) + schedule.next_fire_time = next_fire_time + datastore.add_schedule(schedule, ConflictPolicy.replace) + + schedules = datastore.get_schedules({schedule.id}) + assert schedules[0].task_id == "foo" + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert schedules[0].coalesce is CoalescePolicy.earliest + assert schedules[0].misfire_grace_time is None + assert schedules[0].tags == frozenset() + + received_event = events.pop(0) + assert received_event.schedule_id == "s3" + assert received_event.next_fire_time == datetime( + 2020, 9, 15, tzinfo=timezone.utc + ) + assert not events + + def test_remove_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + datastore.remove_schedules(["s1", "s2"]) + assert datastore.get_schedules() == [schedules[2]] + + received_event = events.pop(0) + assert received_event.schedule_id == "s1" + + received_event = events.pop(0) + assert received_event.schedule_id == "s2" + + assert not events + + @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) + def test_acquire_release_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + event_types = {ScheduleRemoved, ScheduleUpdated} + with self.capture_events(datastore, 2, event_types) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + # The first scheduler gets the first due schedule + schedules1 = datastore.acquire_schedules("dummy-id1", 1) + assert len(schedules1) == 1 + assert schedules1[0].id == "s1" + + # The second scheduler gets the second due schedule + schedules2 = datastore.acquire_schedules("dummy-id2", 1) + assert len(schedules2) == 1 + assert schedules2[0].id == "s2" + + # The third scheduler gets nothing + schedules3 = datastore.acquire_schedules("dummy-id3", 1) + assert not schedules3 + + # Update the schedules and check that the job store actually deletes the first + # one and updates the second one + schedules1[0].next_fire_time = None + schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) + + # Release all the schedules + datastore.release_schedules("dummy-id1", schedules1) + datastore.release_schedules("dummy-id2", schedules2) + + # Check that the first schedule is gone + schedules = datastore.get_schedules() + assert len(schedules) == 2 + assert schedules[0].id == "s2" + assert schedules[1].id == "s3" + + # Check for the appropriate update and delete events + received_event = events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == "s1" + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s2" + assert received_event.next_fire_time == datetime( + 2020, 9, 15, tzinfo=timezone.utc + ) + + assert not events + + def test_release_schedule_two_identical_fire_times( + self, datastore: DataStore + ) -> None: + """Regression test for #616.""" + for i in range(1, 3): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = datastore.acquire_schedules("foo", 3) + schedules[0].next_fire_time = None + datastore.release_schedules("foo", schedules) + + remaining = datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 1 + assert remaining[0].id == schedules[1].id + + def test_release_two_schedules_at_once(self, datastore: DataStore) -> None: + """Regression test for #621.""" + for i in range(2): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = datastore.acquire_schedules("foo", 3) + datastore.release_schedules("foo", schedules) + + remaining = datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 2 + + def test_acquire_schedules_lock_timeout( + self, datastore: DataStore, schedules: list[Schedule], freezer + ) -> None: + """ + Test that a scheduler can acquire schedules that were acquired by another scheduler but + not released within the lock timeout period. + + """ + datastore.add_schedule(schedules[0], ConflictPolicy.exception) + + # First, one scheduler acquires the first available schedule + acquired1 = datastore.acquire_schedules("dummy-id1", 1) + assert len(acquired1) == 1 + assert acquired1[0].id == "s1" + + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + acquired2 = datastore.acquire_schedules("dummy-id2", 1) + assert not acquired2 + + # Right after that, the schedule should be available + freezer.tick(1) + acquired3 = datastore.acquire_schedules("dummy-id2", 1) + assert len(acquired3) == 1 + assert acquired3[0].id == "s1" + + def test_acquire_multiple_workers(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + jobs = [Job(task_id="task1") for _ in range(2)] + for job in jobs: + datastore.add_job(job) + + # The first worker gets the first job in the queue + jobs1 = datastore.acquire_jobs("worker1", 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id + + # The second worker gets the second job + jobs2 = datastore.acquire_jobs("worker2", 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id + + # The third worker gets nothing + jobs3 = datastore.acquire_jobs("worker3", 1) + assert not jobs3 + + def test_job_release_success(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.success, + return_value="foo", + ), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == "foo" + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_failure(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.error, + exception=ValueError("foo"), + ), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, ValueError) + assert result.exception.args == ("foo",) + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_missed_deadline(self, datastore: DataStore): + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_cancelled(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker1", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker1", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_acquire_jobs_lock_timeout( + self, datastore: DataStore, freezer: FrozenDateTimeFactory + ) -> None: + """ + Test that a worker can acquire jobs that were acquired by another scheduler but not + released within the lock timeout period. + + """ + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + # First, one worker acquires the first available job + acquired = datastore.acquire_jobs("worker1", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not datastore.acquire_jobs("worker2", 1) + + # Right after that, the job should be available + freezer.tick(1) + acquired = datastore.acquire_jobs("worker2", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None: + datastore.add_task( + Task(id="task1", func=asynccontextmanager, max_running_jobs=2) + ) + jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] + for job in jobs: + datastore.add_job(job) + + # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 + acquired_jobs = datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] + + # Release one job, and the worker should be able to acquire the third job + datastore.release_job( + "worker1", + acquired_jobs[0].task_id, + JobResult( + job_id=acquired_jobs[0].id, + outcome=JobOutcome.success, + return_value=None, + ), + ) + acquired_jobs = datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [jobs[2].id] @pytest.mark.anyio -class TestAsyncStores: +class TestAsyncDataStores: + @asynccontextmanager + async def capture_events( + self, + datastore: AsyncDataStore, + limit: int, + event_types: set[type[Event]] | None = None, + ) -> AsyncGenerator[list[Event], None]: + def listener(event: Event) -> None: + events.append(event) + if len(events) == limit: + limit_event.set() + subscription.unsubscribe() + + events: list[Event] = [] + limit_event = anyio.Event() + subscription = datastore.events.subscribe(listener, event_types) + yield events + if limit: + with anyio.fail_after(3): + await limit_event.wait() + + @pytest.fixture + async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]: + broker = LocalAsyncEventBroker() + await broker.start() + yield broker + await broker.stop() + + @pytest.fixture( + params=[ + pytest.param(lazy_fixture("adapted_memory_store"), id="memory"), + pytest.param( + lazy_fixture("asyncpg_store"), + id="asyncpg", + marks=[pytest.mark.external_service], + ), + ] + ) + async def datastore( + self, request: SubRequest, event_broker: AsyncEventBroker + ) -> AsyncGenerator[AsyncDataStore, Any]: + datastore = cast(AsyncDataStore, request.param) + await datastore.start(event_broker) + yield datastore + await datastore.stop() + async def test_add_replace_task(self, datastore: AsyncDataStore) -> None: import math event_types = {TaskAdded, TaskUpdated} - async with datastore, capture_events(datastore, 3, event_types) as events: + async with self.capture_events(datastore, 3, event_types) as events: await datastore.add_task(Task(id="test_task", func=print)) await datastore.add_task(Task(id="test_task2", func=math.ceil)) await datastore.add_task(Task(id="test_task", func=repr)) @@ -198,7 +655,7 @@ class TestAsyncStores: async def test_add_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events: + async with self.capture_events(datastore, 3, {ScheduleAdded}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -215,7 +672,7 @@ class TestAsyncStores: async def test_replace_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events: + async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -252,7 +709,7 @@ class TestAsyncStores: async def test_remove_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events: + async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -272,7 +729,7 @@ class TestAsyncStores: self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: event_types = {ScheduleRemoved, ScheduleUpdated} - async with datastore, capture_events(datastore, 2, event_types) as events: + async with self.capture_events(datastore, 2, event_types) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -323,16 +780,15 @@ class TestAsyncStores: self, datastore: AsyncDataStore ) -> None: """Regression test for #616.""" - async with datastore: - for i in range(1, 3): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + for i in range(1, 3): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) - schedules = await datastore.acquire_schedules("foo", 3) - schedules[0].next_fire_time = None - await datastore.release_schedules("foo", schedules) + schedules = await datastore.acquire_schedules("foo", 3) + schedules[0].next_fire_time = None + await datastore.release_schedules("foo", schedules) remaining = await datastore.get_schedules({s.id for s in schedules}) assert len(remaining) == 1 @@ -342,15 +798,14 @@ class TestAsyncStores: self, datastore: AsyncDataStore ) -> None: """Regression test for #621.""" - async with datastore: - for i in range(2): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + for i in range(2): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) - schedules = await datastore.acquire_schedules("foo", 3) - await datastore.release_schedules("foo", schedules) + schedules = await datastore.acquire_schedules("foo", 3) + await datastore.release_schedules("foo", schedules) remaining = await datastore.get_schedules({s.id for s in schedules}) assert len(remaining) == 2 @@ -363,153 +818,145 @@ class TestAsyncStores: not released within the lock timeout period. """ - async with datastore: - await datastore.add_schedule(schedules[0], ConflictPolicy.exception) - - # First, one scheduler acquires the first available schedule - acquired1 = await datastore.acquire_schedules("dummy-id1", 1) - assert len(acquired1) == 1 - assert acquired1[0].id == "s1" - - # Try to acquire the schedule just at the threshold (now == acquired_until). - # This should not yield any schedules. - freezer.tick(30) - acquired2 = await datastore.acquire_schedules("dummy-id2", 1) - assert not acquired2 - - # Right after that, the schedule should be available - freezer.tick(1) - acquired3 = await datastore.acquire_schedules("dummy-id2", 1) - assert len(acquired3) == 1 - assert acquired3[0].id == "s1" + await datastore.add_schedule(schedules[0], ConflictPolicy.exception) - async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - jobs = [Job(task_id="task1") for _ in range(2)] - for job in jobs: - await datastore.add_job(job) - - # The first worker gets the first job in the queue - jobs1 = await datastore.acquire_jobs("worker1", 1) - assert len(jobs1) == 1 - assert jobs1[0].id == jobs[0].id - - # The second worker gets the second job - jobs2 = await datastore.acquire_jobs("worker2", 1) - assert len(jobs2) == 1 - assert jobs2[0].id == jobs[1].id - - # The third worker gets nothing - jobs3 = await datastore.acquire_jobs("worker3", 1) - assert not jobs3 - - async def test_job_release_success(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + # First, one scheduler acquires the first available schedule + acquired1 = await datastore.acquire_schedules("dummy-id1", 1) + assert len(acquired1) == 1 + assert acquired1[0].id == "s1" - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, - return_value="foo", - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.success - assert result.exception is None - assert result.return_value == "foo" + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + acquired2 = await datastore.acquire_schedules("dummy-id2", 1) + assert not acquired2 - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Right after that, the schedule should be available + freezer.tick(1) + acquired3 = await datastore.acquire_schedules("dummy-id2", 1) + assert len(acquired3) == 1 + assert acquired3[0].id == "s1" - async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + jobs = [Job(task_id="task1") for _ in range(2)] + for job in jobs: await datastore.add_job(job) - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, - exception=ValueError("foo"), - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.error - assert isinstance(result.exception, ValueError) - assert result.exception.args == ("foo",) - assert result.return_value is None + # The first worker gets the first job in the queue + jobs1 = await datastore.acquire_jobs("worker1", 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # The second worker gets the second job + jobs2 = await datastore.acquire_jobs("worker2", 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id - async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + # The third worker gets nothing + jobs3 = await datastore.acquire_jobs("worker3", 1) + assert not jobs3 + + async def test_job_release_success(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.success, + return_value="foo", + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == "foo" - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.missed_start_deadline - assert result.exception is None - assert result.return_value is None + async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.error, + exception=ValueError("foo"), + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, ValueError) + assert result.exception.args == ("foo",) + assert result.return_value is None - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None - acquired = await datastore.acquire_jobs("worker1", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - await datastore.release_job( - "worker1", - acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.cancelled - assert result.exception is None - assert result.return_value is None + async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker1", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker1", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) async def test_acquire_jobs_lock_timeout( self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory @@ -519,51 +966,49 @@ class TestAsyncStores: released within the lock timeout period. """ - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) - - # First, one worker acquires the first available job - acquired = await datastore.acquire_jobs("worker1", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - # Try to acquire the job just at the threshold (now == acquired_until). - # This should not yield any jobs. - freezer.tick(30) - assert not await datastore.acquire_jobs("worker2", 1) - - # Right after that, the job should be available - freezer.tick(1) - acquired = await datastore.acquire_jobs("worker2", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + # First, one worker acquires the first available job + acquired = await datastore.acquire_jobs("worker1", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not await datastore.acquire_jobs("worker2", 1) + + # Right after that, the job should be available + freezer.tick(1) + acquired = await datastore.acquire_jobs("worker2", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id async def test_acquire_jobs_max_number_exceeded( self, datastore: AsyncDataStore ) -> None: - async with datastore: - await datastore.add_task( - Task(id="task1", func=asynccontextmanager, max_running_jobs=2) - ) - jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] - for job in jobs: - await datastore.add_job(job) - - # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] - - # Release one job, and the worker should be able to acquire the third job - await datastore.release_job( - "worker1", - acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, - return_value=None, - ), - ) - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [jobs[2].id] + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, max_running_jobs=2) + ) + jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] + for job in jobs: + await datastore.add_job(job) + + # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] + + # Release one job, and the worker should be able to acquire the third job + await datastore.release_job( + "worker1", + acquired_jobs[0].task_id, + JobResult( + job_id=acquired_jobs[0].id, + outcome=JobOutcome.success, + return_value=None, + ), + ) + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [jobs[2].id] diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index 7ec90ab..ffa4be7 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -1,9 +1,10 @@ from __future__ import annotations +from collections.abc import AsyncGenerator, Generator from concurrent.futures import Future from datetime import datetime, timezone from queue import Empty, Queue -from typing import Callable +from typing import Any import pytest from _pytest.fixtures import SubRequest @@ -73,8 +74,10 @@ async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker: ), ] ) -def broker(request: SubRequest) -> Callable[[], EventBroker]: - return request.param +def broker(request: SubRequest) -> Generator[EventBroker, Any, None]: + request.param.start() + yield request.param + request.param.stop() @pytest.fixture( @@ -87,23 +90,24 @@ def broker(request: SubRequest) -> Callable[[], EventBroker]: ), ] ) -def async_broker(request: SubRequest) -> Callable[[], AsyncEventBroker]: - return request.param +async def async_broker(request: SubRequest) -> AsyncGenerator[AsyncEventBroker, Any]: + await request.param.start() + yield request.param + await request.param.stop() class TestEventBroker: def test_publish_subscribe(self, broker: EventBroker) -> None: queue: Queue[Event] = Queue() - with broker: - broker.subscribe(queue.put_nowait) - broker.subscribe(queue.put_nowait) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - broker.publish(event) - event1 = queue.get(timeout=3) - event2 = queue.get(timeout=1) + broker.subscribe(queue.put_nowait) + broker.subscribe(queue.put_nowait) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), + ) + broker.publish(event) + event1 = queue.get(timeout=3) + event2 = queue.get(timeout=1) assert event1 == event2 assert isinstance(event1, ScheduleAdded) @@ -115,43 +119,39 @@ class TestEventBroker: def test_subscribe_one_shot(self, broker: EventBroker) -> None: queue: Queue[Event] = Queue() - with broker: - broker.subscribe(queue.put_nowait, one_shot=True) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - broker.publish(event) - event = ScheduleAdded( - schedule_id="schedule2", - next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), - ) - broker.publish(event) - received_event = queue.get(timeout=3) - with pytest.raises(Empty): - queue.get(timeout=0.1) + broker.subscribe(queue.put_nowait, one_shot=True) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), + ) + broker.publish(event) + event = ScheduleAdded( + schedule_id="schedule2", + next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), + ) + broker.publish(event) + received_event = queue.get(timeout=3) + with pytest.raises(Empty): + queue.get(timeout=0.1) assert isinstance(received_event, ScheduleAdded) assert received_event.schedule_id == "schedule1" def test_unsubscribe(self, broker: EventBroker, caplog) -> None: queue: Queue[Event] = Queue() - with broker: - subscription = broker.subscribe(queue.put_nowait) - broker.publish(Event()) - queue.get(timeout=3) + subscription = broker.subscribe(queue.put_nowait) + broker.publish(Event()) + queue.get(timeout=3) - subscription.unsubscribe() - broker.publish(Event()) - with pytest.raises(Empty): - queue.get(timeout=0.1) + subscription.unsubscribe() + broker.publish(Event()) + with pytest.raises(Empty): + queue.get(timeout=0.1) def test_publish_no_subscribers( self, broker: EventBroker, caplog: LogCaptureFixture ) -> None: - with broker: - broker.publish(Event()) - + broker.publish(Event()) assert not caplog.text def test_publish_exception( @@ -162,33 +162,31 @@ class TestEventBroker: timestamp = datetime.now(timezone.utc) event_future: Future[Event] = Future() - with broker: - broker.subscribe(bad_subscriber) - broker.subscribe(event_future.set_result) - broker.publish(Event(timestamp=timestamp)) + broker.subscribe(bad_subscriber) + broker.subscribe(event_future.set_result) + broker.publish(Event(timestamp=timestamp)) - event = event_future.result(3) - assert isinstance(event, Event) - assert event.timestamp == timestamp - assert "Error delivering Event" in caplog.text + event = event_future.result(3) + assert isinstance(event, Event) + assert event.timestamp == timestamp + assert "Error delivering Event" in caplog.text @pytest.mark.anyio class TestAsyncEventBroker: async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream(2) - async with async_broker: - async_broker.subscribe(send.send) - async_broker.subscribe(send.send_nowait) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - await async_broker.publish(event) - - with fail_after(3): - event1 = await receive.receive() - event2 = await receive.receive() + async_broker.subscribe(send.send) + async_broker.subscribe(send.send_nowait) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), + ) + await async_broker.publish(event) + + with fail_after(3): + event1 = await receive.receive() + event2 = await receive.receive() assert event1 == event2 assert isinstance(event1, ScheduleAdded) @@ -200,47 +198,43 @@ class TestAsyncEventBroker: async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream(2) - async with async_broker: - async_broker.subscribe(send.send, one_shot=True) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - await async_broker.publish(event) - event = ScheduleAdded( - schedule_id="schedule2", - next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), - ) - await async_broker.publish(event) - - with fail_after(3): - received_event = await receive.receive() - - with pytest.raises(TimeoutError), fail_after(0.1): - await receive.receive() + async_broker.subscribe(send.send, one_shot=True) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), + ) + await async_broker.publish(event) + event = ScheduleAdded( + schedule_id="schedule2", + next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), + ) + await async_broker.publish(event) + + with fail_after(3): + received_event = await receive.receive() + + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() assert isinstance(received_event, ScheduleAdded) assert received_event.schedule_id == "schedule1" async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: send, receive = create_memory_object_stream() - async with async_broker: - subscription = async_broker.subscribe(send.send) - await async_broker.publish(Event()) - with fail_after(3): - await receive.receive() + subscription = async_broker.subscribe(send.send) + await async_broker.publish(Event()) + with fail_after(3): + await receive.receive() - subscription.unsubscribe() - await async_broker.publish(Event()) - with pytest.raises(TimeoutError), fail_after(0.1): - await receive.receive() + subscription.unsubscribe() + await async_broker.publish(Event()) + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() async def test_publish_no_subscribers( self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture ) -> None: - async with async_broker: - await async_broker.publish(Event()) - + await async_broker.publish(Event()) assert not caplog.text async def test_publish_exception( @@ -251,11 +245,10 @@ class TestAsyncEventBroker: timestamp = datetime.now(timezone.utc) send, receive = create_memory_object_stream() - async with async_broker: - async_broker.subscribe(bad_subscriber) - async_broker.subscribe(send.send) - await async_broker.publish(Event(timestamp=timestamp)) + async_broker.subscribe(bad_subscriber) + async_broker.subscribe(send.send) + await async_broker.publish(Event(timestamp=timestamp)) - received_event = await receive.receive() - assert received_event.timestamp == timestamp - assert "Error delivering Event" in caplog.text + received_event = await receive.receive() + assert received_event.timestamp == timestamp + assert "Error delivering Event" in caplog.text diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index b8c64a2..e33a91e 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -18,7 +18,6 @@ from apscheduler.events import ( JobAdded, ScheduleAdded, ScheduleRemoved, - SchedulerStarted, SchedulerStopped, TaskAdded, ) @@ -28,6 +27,7 @@ from apscheduler.schedulers.sync import Scheduler from apscheduler.structures import Job, Task from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.workers.async_ import AsyncWorker if sys.version_info >= (3, 9): from zoneinfo import ZoneInfo @@ -57,23 +57,18 @@ class TestAsyncScheduler: async def test_schedule_job(self) -> None: def listener(received_event: Event) -> None: received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, ScheduleRemoved): event.set() received_events: list[Event] = [] event = anyio.Event() - scheduler = AsyncScheduler(start_worker=False) - scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) - async with scheduler: + async with AsyncScheduler(start_worker=False) as scheduler: + scheduler.event_broker.subscribe(listener) await scheduler.add_schedule(dummy_async_job, trigger, id="foo") with fail_after(3): await event.wait() - # The scheduler was first started - received_event = received_events.pop(0) - assert isinstance(received_event, SchedulerStarted) - # Then the task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) @@ -129,7 +124,7 @@ class TestAsyncScheduler: async with AsyncScheduler(start_worker=False) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = anyio.Event() - scheduler.events.subscribe(job_added_listener, {JobAdded}) + scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) schedule_id = await scheduler.add_schedule( dummy_async_job, trigger, max_jitter=max_jitter ) @@ -142,7 +137,8 @@ class TestAsyncScheduler: fake_uniform.assert_called_once_with(0, expected_upper_bound) - # Check that the job was created with the proper amount of jitter in its scheduled time + # Check that the job was created with the proper amount of jitter in its + # scheduled time jobs = await scheduler.data_store.get_jobs({job_id}) assert jobs[0].jitter == timedelta(seconds=jitter) assert jobs[0].scheduled_fire_time == orig_start_time + timedelta( @@ -188,7 +184,7 @@ class TestAsyncScheduler: async def test_contextvars(self) -> None: def check_contextvars() -> None: assert current_scheduler.get() is scheduler - assert current_worker.get() is scheduler.worker + assert isinstance(current_worker.get(), AsyncWorker) info = job_info.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" @@ -223,23 +219,18 @@ class TestSyncScheduler: def test_schedule_job(self): def listener(received_event: Event) -> None: received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, ScheduleRemoved): event.set() received_events: list[Event] = [] event = threading.Event() - scheduler = Scheduler(start_worker=False) - scheduler.events.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) - with scheduler: + with Scheduler(start_worker=False) as scheduler: + scheduler.event_broker.subscribe(listener) scheduler.add_schedule(dummy_sync_job, trigger, id="foo") event.wait(3) - # The scheduler was first started - received_event = received_events.pop(0) - assert isinstance(received_event, SchedulerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "test_schedulers:dummy_sync_job" @@ -264,9 +255,6 @@ class TestSyncScheduler: received_event = received_events.pop(0) assert isinstance(received_event, SchedulerStopped) - # There should be no more events on the list - assert not received_events - @pytest.mark.parametrize( "max_jitter, expected_upper_bound", [pytest.param(2, 2, id="within"), pytest.param(4, 2.999999, id="exceed")], @@ -293,7 +281,7 @@ class TestSyncScheduler: with Scheduler(start_worker=False) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = threading.Event() - scheduler.events.subscribe(job_added_listener, {JobAdded}) + scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) schedule_id = scheduler.add_schedule( dummy_async_job, trigger, max_jitter=max_jitter ) @@ -350,7 +338,7 @@ class TestSyncScheduler: def test_contextvars(self) -> None: def check_contextvars() -> None: assert current_scheduler.get() is scheduler - assert current_worker.get() is scheduler.worker + assert current_worker.get() is not None info = job_info.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" diff --git a/tests/test_workers.py b/tests/test_workers.py index 6e5568f..e1c7421 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -17,7 +17,6 @@ from apscheduler.events import ( JobAdded, JobReleased, TaskAdded, - WorkerStarted, WorkerStopped, ) from apscheduler.structures import Task @@ -55,26 +54,20 @@ class TestAsyncWorker: ) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() received_events: list[Event] = [] event = anyio.Event() - data_store = MemoryDataStore() - worker = AsyncWorker(data_store) - worker.events.subscribe(listener) - async with worker: + async with AsyncWorker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) await worker.data_store.add_task(Task(id="task_id", func=target_func)) job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) await worker.data_store.add_job(job) with fail_after(3): await event.wait() - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -112,16 +105,14 @@ class TestAsyncWorker: async def test_run_deadline_missed(self) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) received_events: list[Event] = [] event = anyio.Event() - data_store = MemoryDataStore() - worker = AsyncWorker(data_store) - worker.events.subscribe(listener) - async with worker: + async with AsyncWorker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) await worker.data_store.add_task(Task(id="task_id", func=fail_func)) job = Job( task_id="task_id", @@ -133,11 +124,7 @@ class TestAsyncWorker: with fail_after(3): await event.wait() - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -175,25 +162,19 @@ class TestSyncWorker: def test_run_job_nonscheduled(self, fail: bool) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() received_events: list[Event] = [] event = threading.Event() - data_store = MemoryDataStore() - worker = Worker(data_store) - worker.events.subscribe(listener) - with worker: + with Worker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) worker.data_store.add_task(Task(id="task_id", func=sync_func)) job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) worker.data_store.add_job(job) - event.wait(5) - - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) + event.wait(3) - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -229,18 +210,16 @@ class TestSyncWorker: assert not received_events def test_run_deadline_missed(self) -> None: - def listener(worker_event: Event): - received_events.append(worker_event) - if len(received_events) == 5: + def listener(received_event: Event): + received_events.append(received_event) + if isinstance(received_event, JobReleased): event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) received_events: list[Event] = [] event = threading.Event() - data_store = MemoryDataStore() - worker = Worker(data_store) - worker.events.subscribe(listener) - with worker: + with Worker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) worker.data_store.add_task(Task(id="task_id", func=fail_func)) job = Job( task_id="task_id", @@ -251,11 +230,7 @@ class TestSyncWorker: worker.data_store.add_job(job) event.wait(3) - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" -- cgit v1.2.1