From c5727432736b55b7d76753307f14efdb962c2edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 12 Sep 2022 22:09:05 +0300 Subject: Major refactoring - Made SyncScheduler a synchronous wrapper for AsyncScheduler - Removed workers as a user interface - Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface - Added the current_async_scheduler contextvar - Added job executors --- tests/conftest.py | 194 +++++- tests/test_datastores.py | 1435 +++++++++++++------------------------------- tests/test_eventbrokers.py | 325 +++------- tests/test_schedulers.py | 36 +- tests/test_workers.py | 281 --------- 5 files changed, 710 insertions(+), 1561 deletions(-) delete mode 100644 tests/test_workers.py (limited to 'tests') diff --git a/tests/conftest.py b/tests/conftest.py index 31ea9b0..7497b52 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,16 @@ from __future__ import annotations import sys +from contextlib import AsyncExitStack +from tempfile import TemporaryDirectory +from typing import Any, AsyncGenerator, cast import pytest +from _pytest.fixtures import SubRequest +from pytest_lazyfixture import lazy_fixture -from apscheduler.abc import Serializer +from apscheduler.abc import DataStore, EventBroker, Serializer +from apscheduler.datastores.memory import MemoryDataStore from apscheduler.serializers.cbor import CBORSerializer from apscheduler.serializers.json import JSONSerializer from apscheduler.serializers.pickle import PickleSerializer @@ -34,3 +40,189 @@ def serializer(request) -> Serializer | None: @pytest.fixture def anyio_backend() -> str: return "asyncio" + + +@pytest.fixture +def local_broker() -> EventBroker: + from apscheduler.eventbrokers.local import LocalEventBroker + + return LocalEventBroker() + + +@pytest.fixture +async def redis_broker(serializer: Serializer) -> EventBroker: + from apscheduler.eventbrokers.redis import RedisEventBroker + + broker = RedisEventBroker.from_url( + "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05 + ) + await broker.client.flushdb() + return broker + + +@pytest.fixture +def mqtt_broker(serializer: Serializer) -> EventBroker: + from paho.mqtt.client import Client + + from apscheduler.eventbrokers.mqtt import MQTTEventBroker + + return MQTTEventBroker(Client(), serializer=serializer) + + +@pytest.fixture +async def asyncpg_broker(serializer: Serializer) -> EventBroker: + from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker + + broker = AsyncpgEventBroker.from_dsn( + "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer + ) + return broker + + +@pytest.fixture( + params=[ + pytest.param(lazy_fixture("local_broker"), id="local"), + pytest.param( + lazy_fixture("asyncpg_broker"), + id="asyncpg", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("redis_broker"), + id="redis", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service] + ), + ] +) +async def raw_event_broker(request: SubRequest) -> EventBroker: + return cast(EventBroker, request.param) + + +@pytest.fixture +async def event_broker( + raw_event_broker: EventBroker, +) -> AsyncGenerator[EventBroker, Any]: + async with AsyncExitStack() as exit_stack: + await raw_event_broker.start(exit_stack) + yield raw_event_broker + + +@pytest.fixture +def memory_store() -> DataStore: + yield MemoryDataStore() + + +@pytest.fixture +def mongodb_store() -> DataStore: + from pymongo import MongoClient + + from apscheduler.datastores.mongodb import MongoDBDataStore + + with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client: + yield MongoDBDataStore(client, start_from_scratch=True) + + +@pytest.fixture +def sqlite_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + with TemporaryDirectory("sqlite_") as tempdir: + engine = create_engine(f"sqlite:///{tempdir}/test.db") + try: + yield SQLAlchemyDataStore(engine) + finally: + engine.dispose() + + +@pytest.fixture +def psycopg2_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") + try: + yield SQLAlchemyDataStore(engine, schema="alter", start_from_scratch=True) + finally: + engine.dispose() + + +@pytest.fixture +def mysql_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_engine("mysql+pymysql://root:secret@localhost/testdb") + try: + yield SQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + engine.dispose() + + +@pytest.fixture +async def asyncpg_store() -> DataStore: + from sqlalchemy.ext.asyncio import create_async_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_async_engine( + "postgresql+asyncpg://postgres:secret@localhost/testdb", future=True + ) + try: + yield SQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + await engine.dispose() + + +@pytest.fixture +async def asyncmy_store() -> DataStore: + from sqlalchemy.ext.asyncio import create_async_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_async_engine( + "mysql+asyncmy://root:secret@localhost/testdb?charset=utf8mb4", future=True + ) + try: + yield SQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + await engine.dispose() + + +@pytest.fixture( + params=[ + pytest.param( + lazy_fixture("asyncpg_store"), + id="asyncpg", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("asyncmy_store"), + id="asyncmy", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("mongodb_store"), + id="mongodb", + marks=[pytest.mark.external_service], + ), + ] +) +async def raw_datastore(request: SubRequest) -> DataStore: + return cast(DataStore, request.param) + + +@pytest.fixture +async def datastore( + raw_datastore: DataStore, local_broker: EventBroker +) -> AsyncGenerator[DataStore, Any]: + async with AsyncExitStack() as exit_stack: + await local_broker.start(exit_stack) + await raw_datastore.start(exit_stack, local_broker) + yield raw_datastore diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 9c9a0cf..618369a 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,18 +1,13 @@ from __future__ import annotations -import threading -from collections.abc import Generator -from contextlib import asynccontextmanager, contextmanager +from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone -from tempfile import TemporaryDirectory -from typing import Any, AsyncGenerator, cast +from typing import AsyncGenerator import anyio import pytest -from _pytest.fixtures import SubRequest from anyio import CancelScope from freezegun.api import FrozenDateTimeFactory -from pytest_lazyfixture import lazy_fixture from apscheduler import ( CoalescePolicy, @@ -30,1055 +25,483 @@ from apscheduler import ( TaskLookupError, TaskUpdated, ) -from apscheduler.abc import AsyncDataStore, AsyncEventBroker, DataStore, EventBroker -from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter -from apscheduler.datastores.memory import MemoryDataStore -from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker -from apscheduler.eventbrokers.local import LocalEventBroker +from apscheduler.abc import DataStore, EventBroker from apscheduler.triggers.date import DateTrigger +pytestmark = pytest.mark.anyio + @pytest.fixture -def memory_store() -> DataStore: - yield MemoryDataStore() +def schedules() -> list[Schedule]: + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule1 = Schedule(id="s1", task_id="task1", trigger=trigger) + schedule1.next_fire_time = trigger.next() + trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc)) + schedule2 = Schedule(id="s2", task_id="task2", trigger=trigger) + schedule2.next_fire_time = trigger.next() -@pytest.fixture -def adapted_memory_store() -> AsyncDataStore: - store = MemoryDataStore() - return AsyncDataStoreAdapter(store) + trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc)) + schedule3 = Schedule(id="s3", task_id="task1", trigger=trigger) + return [schedule1, schedule2, schedule3] -@pytest.fixture -def mongodb_store() -> DataStore: - from pymongo import MongoClient +@asynccontextmanager +async def capture_events( + datastore: DataStore, + limit: int, + event_types: set[type[Event]] | None = None, +) -> AsyncGenerator[list[Event], None]: + def listener(event: Event) -> None: + events.append(event) + if len(events) == limit: + limit_event.set() + subscription.unsubscribe() + + events: list[Event] = [] + limit_event = anyio.Event() + subscription = datastore._event_broker.subscribe(listener, event_types) + yield events + if limit: + with anyio.fail_after(3): + await limit_event.wait() + + +async def test_add_replace_task(datastore: DataStore) -> None: + import math + + event_types = {TaskAdded, TaskUpdated} + async with capture_events(datastore, 3, event_types) as events: + await datastore.add_task(Task(id="test_task", func=print, executor="async")) + await datastore.add_task( + Task(id="test_task2", func=math.ceil, executor="async") + ) + await datastore.add_task(Task(id="test_task", func=repr, executor="async")) - from apscheduler.datastores.mongodb import MongoDBDataStore + tasks = await datastore.get_tasks() + assert len(tasks) == 2 + assert tasks[0].id == "test_task" + assert tasks[0].func is repr + assert tasks[1].id == "test_task2" + assert tasks[1].func is math.ceil - with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client: - yield MongoDBDataStore(client, start_from_scratch=True) + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task" + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task2" -@pytest.fixture -def sqlite_store() -> DataStore: - from sqlalchemy.future import create_engine + received_event = events.pop(0) + assert isinstance(received_event, TaskUpdated) + assert received_event.task_id == "test_task" - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + assert not events - with TemporaryDirectory("sqlite_") as tempdir: - engine = create_engine(f"sqlite:///{tempdir}/test.db") - try: - yield SQLAlchemyDataStore(engine) - finally: - engine.dispose() +async def test_add_schedules(datastore: DataStore, schedules: list[Schedule]) -> None: + async with capture_events(datastore, 3, {ScheduleAdded}) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) -@pytest.fixture -def psycopg2_store() -> DataStore: - from sqlalchemy.future import create_engine + assert await datastore.get_schedules() == schedules + assert await datastore.get_schedules({"s1", "s2", "s3"}) == schedules + assert await datastore.get_schedules({"s1"}) == [schedules[0]] + assert await datastore.get_schedules({"s2"}) == [schedules[1]] + assert await datastore.get_schedules({"s3"}) == [schedules[2]] - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + for event, schedule in zip(events, schedules): + assert event.schedule_id == schedule.id + assert event.next_fire_time == schedule.next_fire_time - engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") - try: - yield SQLAlchemyDataStore(engine, schema="alter", start_from_scratch=True) - finally: - engine.dispose() +async def test_replace_schedules( + datastore: DataStore, schedules: list[Schedule] +) -> None: + async with capture_events(datastore, 1, {ScheduleUpdated}) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) -@pytest.fixture -def mysql_store() -> DataStore: - from sqlalchemy.future import create_engine + next_fire_time = schedules[2].trigger.next() + schedule = Schedule( + id="s3", + task_id="foo", + trigger=schedules[2].trigger, + args=(), + kwargs={}, + coalesce=CoalescePolicy.earliest, + misfire_grace_time=None, + tags=frozenset(), + ) + schedule.next_fire_time = next_fire_time + await datastore.add_schedule(schedule, ConflictPolicy.replace) + + schedules = await datastore.get_schedules({schedule.id}) + assert schedules[0].task_id == "foo" + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert schedules[0].coalesce is CoalescePolicy.earliest + assert schedules[0].misfire_grace_time is None + assert schedules[0].tags == frozenset() + + received_event = events.pop(0) + assert received_event.schedule_id == "s3" + assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + assert not events + + +async def test_remove_schedules( + datastore: DataStore, schedules: list[Schedule] +) -> None: + async with capture_events(datastore, 2, {ScheduleRemoved}) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + await datastore.remove_schedules(["s1", "s2"]) + assert await datastore.get_schedules() == [schedules[2]] - engine = create_engine("mysql+pymysql://root:secret@localhost/testdb") - try: - yield SQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - engine.dispose() + received_event = events.pop(0) + assert received_event.schedule_id == "s1" + received_event = events.pop(0) + assert received_event.schedule_id == "s2" -@pytest.fixture -async def asyncpg_store() -> AsyncDataStore: - from sqlalchemy.ext.asyncio import create_async_engine + assert not events - from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore - engine = create_async_engine( - "postgresql+asyncpg://postgres:secret@localhost/testdb", future=True +@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) +async def test_acquire_release_schedules( + datastore: DataStore, schedules: list[Schedule] +) -> None: + event_types = {ScheduleRemoved, ScheduleUpdated} + async with capture_events(datastore, 2, event_types) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) + + # The first scheduler gets the first due schedule + schedules1 = await datastore.acquire_schedules("dummy-id1", 1) + assert len(schedules1) == 1 + assert schedules1[0].id == "s1" + + # The second scheduler gets the second due schedule + schedules2 = await datastore.acquire_schedules("dummy-id2", 1) + assert len(schedules2) == 1 + assert schedules2[0].id == "s2" + + # The third scheduler gets nothing + schedules3 = await datastore.acquire_schedules("dummy-id3", 1) + assert not schedules3 + + # Update the schedules and check that the job store actually deletes the + # first one and updates the second one + schedules1[0].next_fire_time = None + schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) + + # Release all the schedules + await datastore.release_schedules("dummy-id1", schedules1) + await datastore.release_schedules("dummy-id2", schedules2) + + # Check that the first schedule is gone + schedules = await datastore.get_schedules() + assert len(schedules) == 2 + assert schedules[0].id == "s2" + assert schedules[1].id == "s3" + + # Check for the appropriate update and delete events + received_event = events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == "s1" + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s2" + assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + + assert not events + + +async def test_release_schedule_two_identical_fire_times(datastore: DataStore) -> None: + """Regression test for #616.""" + for i in range(1, 3): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = await datastore.acquire_schedules("foo", 3) + schedules[0].next_fire_time = None + await datastore.release_schedules("foo", schedules) + + remaining = await datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 1 + assert remaining[0].id == schedules[1].id + + +async def test_release_two_schedules_at_once(datastore: DataStore) -> None: + """Regression test for #621.""" + for i in range(2): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = await datastore.acquire_schedules("foo", 3) + await datastore.release_schedules("foo", schedules) + + remaining = await datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 2 + + +async def test_acquire_schedules_lock_timeout( + datastore: DataStore, schedules: list[Schedule], freezer +) -> None: + """ + Test that a scheduler can acquire schedules that were acquired by another + scheduler but not released within the lock timeout period. + + """ + await datastore.add_schedule(schedules[0], ConflictPolicy.exception) + + # First, one scheduler acquires the first available schedule + acquired1 = await datastore.acquire_schedules("dummy-id1", 1) + assert len(acquired1) == 1 + assert acquired1[0].id == "s1" + + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + acquired2 = await datastore.acquire_schedules("dummy-id2", 1) + assert not acquired2 + + # Right after that, the schedule should be available + freezer.tick(1) + acquired3 = await datastore.acquire_schedules("dummy-id2", 1) + assert len(acquired3) == 1 + assert acquired3[0].id == "s1" + + +async def test_acquire_multiple_workers(datastore: DataStore) -> None: + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async") ) - try: - yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - await engine.dispose() + jobs = [Job(task_id="task1") for _ in range(2)] + for job in jobs: + await datastore.add_job(job) + # The first worker gets the first job in the queue + jobs1 = await datastore.acquire_jobs("worker1", 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id -@pytest.fixture -async def asyncmy_store() -> AsyncDataStore: - from sqlalchemy.ext.asyncio import create_async_engine + # The second worker gets the second job + jobs2 = await datastore.acquire_jobs("worker2", 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id + + # The third worker gets nothing + jobs3 = await datastore.acquire_jobs("worker3", 1) + assert not jobs3 - from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore - engine = create_async_engine( - "mysql+asyncmy://root:secret@localhost/testdb?charset=utf8mb4", future=True +async def test_job_release_success(datastore: DataStore) -> None: + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async") ) - try: - yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - await engine.dispose() + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult.from_job( + acquired[0], + JobOutcome.success, + return_value="foo", + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == "foo" + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) -@pytest.fixture -def schedules() -> list[Schedule]: - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule1 = Schedule(id="s1", task_id="task1", trigger=trigger) - schedule1.next_fire_time = trigger.next() - trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc)) - schedule2 = Schedule(id="s2", task_id="task2", trigger=trigger) - schedule2.next_fire_time = trigger.next() +async def test_job_release_failure(datastore: DataStore) -> None: + await datastore.add_task( + Task(id="task1", executor="async", func=asynccontextmanager) + ) + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult.from_job( + acquired[0], + JobOutcome.error, + exception=ValueError("foo"), + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, ValueError) + assert result.exception.args == ("foo",) + assert result.return_value is None - trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc)) - schedule3 = Schedule(id="s3", task_id="task1", trigger=trigger) - return [schedule1, schedule2, schedule3] + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) -class TestDataStores: - @contextmanager - def capture_events( - self, - datastore: DataStore, - limit: int, - event_types: set[type[Event]] | None = None, - ) -> Generator[list[Event], None, None]: - def listener(event: Event) -> None: - events.append(event) - if len(events) == limit: - limit_event.set() - subscription.unsubscribe() - - events: list[Event] = [] - limit_event = threading.Event() - subscription = datastore.events.subscribe(listener, event_types) - yield events - if limit: - limit_event.wait(2) - - @pytest.fixture - def event_broker(self) -> Generator[EventBroker, Any, None]: - broker = LocalEventBroker() - broker.start() - yield broker - broker.stop() - - @pytest.fixture( - params=[ - pytest.param(lazy_fixture("memory_store"), id="memory"), - pytest.param(lazy_fixture("sqlite_store"), id="sqlite"), - pytest.param( - lazy_fixture("mongodb_store"), - id="mongodb", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("psycopg2_store"), - id="psycopg2", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("mysql_store"), - id="mysql", - marks=[pytest.mark.external_service], - ), - ] +async def test_job_release_missed_deadline(datastore: DataStore): + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async") ) - def datastore( - self, request: SubRequest, event_broker: EventBroker - ) -> Generator[DataStore, Any, None]: - datastore = cast(DataStore, request.param) - datastore.start(event_broker) - yield datastore - datastore.stop() - - def test_add_replace_task(self, datastore: DataStore) -> None: - import math - - event_types = {TaskAdded, TaskUpdated} - with self.capture_events(datastore, 3, event_types) as events: - datastore.add_task(Task(id="test_task", func=print)) - datastore.add_task(Task(id="test_task2", func=math.ceil)) - datastore.add_task(Task(id="test_task", func=repr)) - - tasks = datastore.get_tasks() - assert len(tasks) == 2 - assert tasks[0].id == "test_task" - assert tasks[0].func is repr - assert tasks[1].id == "test_task2" - assert tasks[1].func is math.ceil - - received_event = events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "test_task" - - received_event = events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "test_task2" - - received_event = events.pop(0) - assert isinstance(received_event, TaskUpdated) - assert received_event.task_id == "test_task" - - assert not events - - def test_add_schedules( - self, datastore: DataStore, schedules: list[Schedule] - ) -> None: - with self.capture_events(datastore, 3, {ScheduleAdded}) as events: - for schedule in schedules: - datastore.add_schedule(schedule, ConflictPolicy.exception) - - assert datastore.get_schedules() == schedules - assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules - assert datastore.get_schedules({"s1"}) == [schedules[0]] - assert datastore.get_schedules({"s2"}) == [schedules[1]] - assert datastore.get_schedules({"s3"}) == [schedules[2]] - - for event, schedule in zip(events, schedules): - assert event.schedule_id == schedule.id - assert event.next_fire_time == schedule.next_fire_time - - def test_replace_schedules( - self, datastore: DataStore, schedules: list[Schedule] - ) -> None: - with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: - for schedule in schedules: - datastore.add_schedule(schedule, ConflictPolicy.exception) - - next_fire_time = schedules[2].trigger.next() - schedule = Schedule( - id="s3", - task_id="foo", - trigger=schedules[2].trigger, - args=(), - kwargs={}, - coalesce=CoalescePolicy.earliest, - misfire_grace_time=None, - tags=frozenset(), - ) - schedule.next_fire_time = next_fire_time - datastore.add_schedule(schedule, ConflictPolicy.replace) - - schedules = datastore.get_schedules({schedule.id}) - assert schedules[0].task_id == "foo" - assert schedules[0].next_fire_time == next_fire_time - assert schedules[0].args == () - assert schedules[0].kwargs == {} - assert schedules[0].coalesce is CoalescePolicy.earliest - assert schedules[0].misfire_grace_time is None - assert schedules[0].tags == frozenset() - - received_event = events.pop(0) - assert received_event.schedule_id == "s3" - assert received_event.next_fire_time == datetime( - 2020, 9, 15, tzinfo=timezone.utc - ) - assert not events - - def test_remove_schedules( - self, datastore: DataStore, schedules: list[Schedule] - ) -> None: - with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: - for schedule in schedules: - datastore.add_schedule(schedule, ConflictPolicy.exception) - - datastore.remove_schedules(["s1", "s2"]) - assert datastore.get_schedules() == [schedules[2]] - - received_event = events.pop(0) - assert received_event.schedule_id == "s1" - - received_event = events.pop(0) - assert received_event.schedule_id == "s2" - - assert not events - - @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) - def test_acquire_release_schedules( - self, datastore: DataStore, schedules: list[Schedule] - ) -> None: - event_types = {ScheduleRemoved, ScheduleUpdated} - with self.capture_events(datastore, 2, event_types) as events: - for schedule in schedules: - datastore.add_schedule(schedule, ConflictPolicy.exception) - - # The first scheduler gets the first due schedule - schedules1 = datastore.acquire_schedules("dummy-id1", 1) - assert len(schedules1) == 1 - assert schedules1[0].id == "s1" - - # The second scheduler gets the second due schedule - schedules2 = datastore.acquire_schedules("dummy-id2", 1) - assert len(schedules2) == 1 - assert schedules2[0].id == "s2" - - # The third scheduler gets nothing - schedules3 = datastore.acquire_schedules("dummy-id3", 1) - assert not schedules3 - - # Update the schedules and check that the job store actually deletes the - # first one and updates the second one - schedules1[0].next_fire_time = None - schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) - - # Release all the schedules - datastore.release_schedules("dummy-id1", schedules1) - datastore.release_schedules("dummy-id2", schedules2) - - # Check that the first schedule is gone - schedules = datastore.get_schedules() - assert len(schedules) == 2 - assert schedules[0].id == "s2" - assert schedules[1].id == "s3" - - # Check for the appropriate update and delete events - received_event = events.pop(0) - assert isinstance(received_event, ScheduleRemoved) - assert received_event.schedule_id == "s1" - - received_event = events.pop(0) - assert isinstance(received_event, ScheduleUpdated) - assert received_event.schedule_id == "s2" - assert received_event.next_fire_time == datetime( - 2020, 9, 15, tzinfo=timezone.utc - ) + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None - assert not events - - def test_release_schedule_two_identical_fire_times( - self, datastore: DataStore - ) -> None: - """Regression test for #616.""" - for i in range(1, 3): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - datastore.add_schedule(schedule, ConflictPolicy.exception) - - schedules = datastore.acquire_schedules("foo", 3) - schedules[0].next_fire_time = None - datastore.release_schedules("foo", schedules) - - remaining = datastore.get_schedules({s.id for s in schedules}) - assert len(remaining) == 1 - assert remaining[0].id == schedules[1].id - - def test_release_two_schedules_at_once(self, datastore: DataStore) -> None: - """Regression test for #621.""" - for i in range(2): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - datastore.add_schedule(schedule, ConflictPolicy.exception) - - schedules = datastore.acquire_schedules("foo", 3) - datastore.release_schedules("foo", schedules) - - remaining = datastore.get_schedules({s.id for s in schedules}) - assert len(remaining) == 2 - - def test_acquire_schedules_lock_timeout( - self, datastore: DataStore, schedules: list[Schedule], freezer - ) -> None: - """ - Test that a scheduler can acquire schedules that were acquired by another - scheduler but not released within the lock timeout period. - - """ - datastore.add_schedule(schedules[0], ConflictPolicy.exception) - - # First, one scheduler acquires the first available schedule - acquired1 = datastore.acquire_schedules("dummy-id1", 1) - assert len(acquired1) == 1 - assert acquired1[0].id == "s1" - - # Try to acquire the schedule just at the threshold (now == acquired_until). - # This should not yield any schedules. - freezer.tick(30) - acquired2 = datastore.acquire_schedules("dummy-id2", 1) - assert not acquired2 - - # Right after that, the schedule should be available - freezer.tick(1) - acquired3 = datastore.acquire_schedules("dummy-id2", 1) - assert len(acquired3) == 1 - assert acquired3[0].id == "s1" - - def test_acquire_multiple_workers(self, datastore: DataStore) -> None: - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - jobs = [Job(task_id="task1") for _ in range(2)] - for job in jobs: - datastore.add_job(job) - - # The first worker gets the first job in the queue - jobs1 = datastore.acquire_jobs("worker1", 1) - assert len(jobs1) == 1 - assert jobs1[0].id == jobs[0].id - - # The second worker gets the second job - jobs2 = datastore.acquire_jobs("worker2", 1) - assert len(jobs2) == 1 - assert jobs2[0].id == jobs[1].id - - # The third worker gets nothing - jobs3 = datastore.acquire_jobs("worker3", 1) - assert not jobs3 - - def test_job_release_success(self, datastore: DataStore) -> None: - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - datastore.add_job(job) - - acquired = datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.success, - return_value="foo", - ), - ) - result = datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.success - assert result.exception is None - assert result.return_value == "foo" - - # Check that the job and its result are gone - assert not datastore.get_jobs({acquired[0].id}) - assert not datastore.get_job_result(acquired[0].id) - - def test_job_release_failure(self, datastore: DataStore) -> None: - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - datastore.add_job(job) - - acquired = datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.error, - exception=ValueError("foo"), - ), - ) - result = datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.error - assert isinstance(result.exception, ValueError) - assert result.exception.args == ("foo",) - assert result.return_value is None - - # Check that the job and its result are gone - assert not datastore.get_jobs({acquired[0].id}) - assert not datastore.get_job_result(acquired[0].id) - - def test_job_release_missed_deadline(self, datastore: DataStore): - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - datastore.add_job(job) - - acquired = datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.missed_start_deadline, - ), - ) - result = datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.missed_start_deadline - assert result.exception is None - assert result.return_value is None - - # Check that the job and its result are gone - assert not datastore.get_jobs({acquired[0].id}) - assert not datastore.get_job_result(acquired[0].id) - - def test_job_release_cancelled(self, datastore: DataStore) -> None: - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - datastore.add_job(job) - - acquired = datastore.acquire_jobs("worker1", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - datastore.release_job( - "worker1", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.cancelled, - ), - ) - result = datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.cancelled - assert result.exception is None - assert result.return_value is None - - # Check that the job and its result are gone - assert not datastore.get_jobs({acquired[0].id}) - assert not datastore.get_job_result(acquired[0].id) - - def test_acquire_jobs_lock_timeout( - self, datastore: DataStore, freezer: FrozenDateTimeFactory - ) -> None: - """ - Test that a worker can acquire jobs that were acquired by another scheduler but - not released within the lock timeout period. - - """ - datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - datastore.add_job(job) - - # First, one worker acquires the first available job - acquired = datastore.acquire_jobs("worker1", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - # Try to acquire the job just at the threshold (now == acquired_until). - # This should not yield any jobs. - freezer.tick(30) - assert not datastore.acquire_jobs("worker2", 1) - - # Right after that, the job should be available - freezer.tick(1) - acquired = datastore.acquire_jobs("worker2", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None: - datastore.add_task( - Task(id="task1", func=asynccontextmanager, max_running_jobs=2) - ) - jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] - for job in jobs: - datastore.add_job(job) - - # Check that only 2 jobs are returned from acquire_jobs() even though the limit - # wqas 3 - acquired_jobs = datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] - - # Release one job, and the worker should be able to acquire the third job - datastore.release_job( - "worker1", - acquired_jobs[0].task_id, - JobResult.from_job( - acquired_jobs[0], - JobOutcome.success, - return_value=None, - ), - ) - acquired_jobs = datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [jobs[2].id] - - def test_add_get_task(self, datastore: DataStore) -> None: - with pytest.raises(TaskLookupError): - datastore.get_task("dummyid") - - datastore.add_task(Task(id="dummyid", func=asynccontextmanager)) - task = datastore.get_task("dummyid") - assert task.id == "dummyid" - assert task.func is asynccontextmanager - - -@pytest.mark.anyio -class TestAsyncDataStores: - @asynccontextmanager - async def capture_events( - self, - datastore: AsyncDataStore, - limit: int, - event_types: set[type[Event]] | None = None, - ) -> AsyncGenerator[list[Event], None]: - def listener(event: Event) -> None: - events.append(event) - if len(events) == limit: - limit_event.set() - subscription.unsubscribe() - - events: list[Event] = [] - limit_event = anyio.Event() - subscription = datastore.events.subscribe(listener, event_types) - yield events - if limit: - with anyio.fail_after(3): - await limit_event.wait() - - @pytest.fixture - async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]: - broker = LocalAsyncEventBroker() - await broker.start() - yield broker - await broker.stop() - - @pytest.fixture( - params=[ - pytest.param(lazy_fixture("adapted_memory_store"), id="memory"), - pytest.param( - lazy_fixture("asyncpg_store"), - id="asyncpg", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("asyncmy_store"), - id="asyncmy", - marks=[pytest.mark.external_service], - ), - ] + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) + + +async def test_job_release_cancelled(datastore: DataStore) -> None: + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async") ) - async def raw_datastore( - self, request: SubRequest, event_broker: AsyncEventBroker - ) -> AsyncDataStore: - return cast(AsyncDataStore, request.param) - - @pytest.fixture - async def datastore( - self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker - ) -> AsyncGenerator[AsyncDataStore, Any]: - await raw_datastore.start(event_broker) - yield raw_datastore - await raw_datastore.stop() - - async def test_add_replace_task(self, datastore: AsyncDataStore) -> None: - import math - - event_types = {TaskAdded, TaskUpdated} - async with self.capture_events(datastore, 3, event_types) as events: - await datastore.add_task(Task(id="test_task", func=print)) - await datastore.add_task(Task(id="test_task2", func=math.ceil)) - await datastore.add_task(Task(id="test_task", func=repr)) - - tasks = await datastore.get_tasks() - assert len(tasks) == 2 - assert tasks[0].id == "test_task" - assert tasks[0].func is repr - assert tasks[1].id == "test_task2" - assert tasks[1].func is math.ceil - - received_event = events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "test_task" - - received_event = events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "test_task2" - - received_event = events.pop(0) - assert isinstance(received_event, TaskUpdated) - assert received_event.task_id == "test_task" - - assert not events - - async def test_add_schedules( - self, datastore: AsyncDataStore, schedules: list[Schedule] - ) -> None: - async with self.capture_events(datastore, 3, {ScheduleAdded}) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - assert await datastore.get_schedules() == schedules - assert await datastore.get_schedules({"s1", "s2", "s3"}) == schedules - assert await datastore.get_schedules({"s1"}) == [schedules[0]] - assert await datastore.get_schedules({"s2"}) == [schedules[1]] - assert await datastore.get_schedules({"s3"}) == [schedules[2]] - - for event, schedule in zip(events, schedules): - assert event.schedule_id == schedule.id - assert event.next_fire_time == schedule.next_fire_time - - async def test_replace_schedules( - self, datastore: AsyncDataStore, schedules: list[Schedule] - ) -> None: - async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - next_fire_time = schedules[2].trigger.next() - schedule = Schedule( - id="s3", - task_id="foo", - trigger=schedules[2].trigger, - args=(), - kwargs={}, - coalesce=CoalescePolicy.earliest, - misfire_grace_time=None, - tags=frozenset(), - ) - schedule.next_fire_time = next_fire_time - await datastore.add_schedule(schedule, ConflictPolicy.replace) - - schedules = await datastore.get_schedules({schedule.id}) - assert schedules[0].task_id == "foo" - assert schedules[0].next_fire_time == next_fire_time - assert schedules[0].args == () - assert schedules[0].kwargs == {} - assert schedules[0].coalesce is CoalescePolicy.earliest - assert schedules[0].misfire_grace_time is None - assert schedules[0].tags == frozenset() - - received_event = events.pop(0) - assert received_event.schedule_id == "s3" - assert received_event.next_fire_time == datetime( - 2020, 9, 15, tzinfo=timezone.utc - ) - assert not events - - async def test_remove_schedules( - self, datastore: AsyncDataStore, schedules: list[Schedule] - ) -> None: - async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - await datastore.remove_schedules(["s1", "s2"]) - assert await datastore.get_schedules() == [schedules[2]] - - received_event = events.pop(0) - assert received_event.schedule_id == "s1" - - received_event = events.pop(0) - assert received_event.schedule_id == "s2" - - assert not events - - @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) - async def test_acquire_release_schedules( - self, datastore: AsyncDataStore, schedules: list[Schedule] - ) -> None: - event_types = {ScheduleRemoved, ScheduleUpdated} - async with self.capture_events(datastore, 2, event_types) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - # The first scheduler gets the first due schedule - schedules1 = await datastore.acquire_schedules("dummy-id1", 1) - assert len(schedules1) == 1 - assert schedules1[0].id == "s1" - - # The second scheduler gets the second due schedule - schedules2 = await datastore.acquire_schedules("dummy-id2", 1) - assert len(schedules2) == 1 - assert schedules2[0].id == "s2" - - # The third scheduler gets nothing - schedules3 = await datastore.acquire_schedules("dummy-id3", 1) - assert not schedules3 - - # Update the schedules and check that the job store actually deletes the - # first one and updates the second one - schedules1[0].next_fire_time = None - schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) - - # Release all the schedules - await datastore.release_schedules("dummy-id1", schedules1) - await datastore.release_schedules("dummy-id2", schedules2) - - # Check that the first schedule is gone - schedules = await datastore.get_schedules() - assert len(schedules) == 2 - assert schedules[0].id == "s2" - assert schedules[1].id == "s3" - - # Check for the appropriate update and delete events - received_event = events.pop(0) - assert isinstance(received_event, ScheduleRemoved) - assert received_event.schedule_id == "s1" - - received_event = events.pop(0) - assert isinstance(received_event, ScheduleUpdated) - assert received_event.schedule_id == "s2" - assert received_event.next_fire_time == datetime( - 2020, 9, 15, tzinfo=timezone.utc - ) + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) + await datastore.add_job(job) - assert not events + acquired = await datastore.acquire_jobs("worker1", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id - async def test_release_schedule_two_identical_fire_times( - self, datastore: AsyncDataStore - ) -> None: - """Regression test for #616.""" - for i in range(1, 3): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + await datastore.release_job( + "worker1", + acquired[0].task_id, + JobResult.from_job(acquired[0], JobOutcome.cancelled), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) + + +async def test_acquire_jobs_lock_timeout( + datastore: DataStore, freezer: FrozenDateTimeFactory +) -> None: + """ + Test that a worker can acquire jobs that were acquired by another scheduler but + not released within the lock timeout period. + + """ + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async") + ) + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) + await datastore.add_job(job) - schedules = await datastore.acquire_schedules("foo", 3) - schedules[0].next_fire_time = None - await datastore.release_schedules("foo", schedules) - - remaining = await datastore.get_schedules({s.id for s in schedules}) - assert len(remaining) == 1 - assert remaining[0].id == schedules[1].id - - async def test_release_two_schedules_at_once( - self, datastore: AsyncDataStore - ) -> None: - """Regression test for #621.""" - for i in range(2): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + # First, one worker acquires the first available job + acquired = await datastore.acquire_jobs("worker1", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id - schedules = await datastore.acquire_schedules("foo", 3) - await datastore.release_schedules("foo", schedules) - - remaining = await datastore.get_schedules({s.id for s in schedules}) - assert len(remaining) == 2 - - async def test_acquire_schedules_lock_timeout( - self, datastore: AsyncDataStore, schedules: list[Schedule], freezer - ) -> None: - """ - Test that a scheduler can acquire schedules that were acquired by another - scheduler but not released within the lock timeout period. - - """ - await datastore.add_schedule(schedules[0], ConflictPolicy.exception) - - # First, one scheduler acquires the first available schedule - acquired1 = await datastore.acquire_schedules("dummy-id1", 1) - assert len(acquired1) == 1 - assert acquired1[0].id == "s1" - - # Try to acquire the schedule just at the threshold (now == acquired_until). - # This should not yield any schedules. - freezer.tick(30) - acquired2 = await datastore.acquire_schedules("dummy-id2", 1) - assert not acquired2 - - # Right after that, the schedule should be available - freezer.tick(1) - acquired3 = await datastore.acquire_schedules("dummy-id2", 1) - assert len(acquired3) == 1 - assert acquired3[0].id == "s1" - - async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - jobs = [Job(task_id="task1") for _ in range(2)] - for job in jobs: - await datastore.add_job(job) - - # The first worker gets the first job in the queue - jobs1 = await datastore.acquire_jobs("worker1", 1) - assert len(jobs1) == 1 - assert jobs1[0].id == jobs[0].id - - # The second worker gets the second job - jobs2 = await datastore.acquire_jobs("worker2", 1) - assert len(jobs2) == 1 - assert jobs2[0].id == jobs[1].id - - # The third worker gets nothing - jobs3 = await datastore.acquire_jobs("worker3", 1) - assert not jobs3 - - async def test_job_release_success(self, datastore: AsyncDataStore) -> None: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - await datastore.add_job(job) + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not await datastore.acquire_jobs("worker2", 1) - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.success, - return_value="foo", - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.success - assert result.exception is None - assert result.return_value == "foo" - - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) - - async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - await datastore.add_job(job) + # Right after that, the job should be available + freezer.tick(1) + acquired = await datastore.acquire_jobs("worker2", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.error, - exception=ValueError("foo"), - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.error - assert isinstance(result.exception, ValueError) - assert result.exception.args == ("foo",) - assert result.return_value is None - - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) - - async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - await datastore.add_job(job) - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult.from_job( - acquired[0], - JobOutcome.missed_start_deadline, - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.missed_start_deadline - assert result.exception is None - assert result.return_value is None - - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) - - async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) +async def test_acquire_jobs_max_number_exceeded(datastore: DataStore) -> None: + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, executor="async", max_running_jobs=2) + ) + jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] + for job in jobs: await datastore.add_job(job) - acquired = await datastore.acquire_jobs("worker1", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id + # Check that only 2 jobs are returned from acquire_jobs() even though the limit + # wqas 3 + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] + + # Release one job, and the worker should be able to acquire the third job + await datastore.release_job( + "worker1", + acquired_jobs[0].task_id, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, + return_value=None, + ), + ) + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [jobs[2].id] - await datastore.release_job( - "worker1", - acquired[0].task_id, - JobResult.from_job(acquired[0], JobOutcome.cancelled), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.cancelled - assert result.exception is None - assert result.return_value is None - - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) - - async def test_acquire_jobs_lock_timeout( - self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory - ) -> None: - """ - Test that a worker can acquire jobs that were acquired by another scheduler but - not released within the lock timeout period. - - """ - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) - await datastore.add_job(job) - # First, one worker acquires the first available job - acquired = await datastore.acquire_jobs("worker1", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - # Try to acquire the job just at the threshold (now == acquired_until). - # This should not yield any jobs. - freezer.tick(30) - assert not await datastore.acquire_jobs("worker2", 1) - - # Right after that, the job should be available - freezer.tick(1) - acquired = await datastore.acquire_jobs("worker2", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - async def test_acquire_jobs_max_number_exceeded( - self, datastore: AsyncDataStore - ) -> None: - await datastore.add_task( - Task(id="task1", func=asynccontextmanager, max_running_jobs=2) - ) - jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] - for job in jobs: - await datastore.add_job(job) - - # Check that only 2 jobs are returned from acquire_jobs() even though the limit - # wqas 3 - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] - - # Release one job, and the worker should be able to acquire the third job - await datastore.release_job( - "worker1", - acquired_jobs[0].task_id, - JobResult.from_job( - acquired_jobs[0], - JobOutcome.success, - return_value=None, - ), - ) - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [jobs[2].id] - - async def test_add_get_task(self, datastore: DataStore) -> None: - with pytest.raises(TaskLookupError): - await datastore.get_task("dummyid") - - await datastore.add_task(Task(id="dummyid", func=asynccontextmanager)) - task = await datastore.get_task("dummyid") - assert task.id == "dummyid" - assert task.func is asynccontextmanager - - async def test_cancel_start( - self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker - ) -> None: - with CancelScope() as scope: - scope.cancel() - await raw_datastore.start(event_broker) - await raw_datastore.stop() - - async def test_cancel_stop( - self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker - ) -> None: - with CancelScope() as scope: - await raw_datastore.start(event_broker) +async def test_add_get_task(datastore: DataStore) -> None: + with pytest.raises(TaskLookupError): + await datastore.get_task("dummyid") + + await datastore.add_task( + Task(id="dummyid", func=asynccontextmanager, executor="async") + ) + task = await datastore.get_task("dummyid") + assert task.id == "dummyid" + assert task.func is asynccontextmanager + + +async def test_cancel_start( + raw_datastore: DataStore, local_broker: EventBroker +) -> None: + with CancelScope() as scope: + scope.cancel() + async with AsyncExitStack() as exit_stack: + await raw_datastore.start(exit_stack, local_broker) + + +async def test_cancel_stop(raw_datastore: DataStore, local_broker: EventBroker) -> None: + with CancelScope() as scope: + async with AsyncExitStack() as exit_stack: + await raw_datastore.start(exit_stack, local_broker) scope.cancel() - await raw_datastore.stop() diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index 09942f5..9b98f60 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -1,286 +1,107 @@ from __future__ import annotations -from collections.abc import AsyncGenerator, Generator -from concurrent.futures import Future +from contextlib import AsyncExitStack from datetime import datetime, timezone -from queue import Empty, Queue -from typing import Any, cast import pytest -from _pytest.fixtures import SubRequest from _pytest.logging import LogCaptureFixture from anyio import CancelScope, create_memory_object_stream, fail_after -from pytest_lazyfixture import lazy_fixture from apscheduler import Event, ScheduleAdded -from apscheduler.abc import AsyncEventBroker, EventBroker, Serializer +from apscheduler.abc import EventBroker +pytestmark = pytest.mark.anyio -@pytest.fixture -def local_broker() -> EventBroker: - from apscheduler.eventbrokers.local import LocalEventBroker - return LocalEventBroker() - - -@pytest.fixture -def local_async_broker() -> AsyncEventBroker: - from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker - - return LocalAsyncEventBroker() - - -@pytest.fixture -def redis_broker(serializer: Serializer) -> EventBroker: - from apscheduler.eventbrokers.redis import RedisEventBroker - - broker = RedisEventBroker.from_url( - "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05 +async def test_publish_subscribe(event_broker: EventBroker) -> None: + send, receive = create_memory_object_stream(2) + event_broker.subscribe(send.send) + event_broker.subscribe(send.send_nowait) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), ) - return broker - - -@pytest.fixture -async def async_redis_broker(serializer: Serializer) -> AsyncEventBroker: - from apscheduler.eventbrokers.async_redis import AsyncRedisEventBroker - - broker = AsyncRedisEventBroker.from_url( - "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05 + await event_broker.publish(event) + + with fail_after(3): + event1 = await receive.receive() + event2 = await receive.receive() + + assert event1 == event2 + assert isinstance(event1, ScheduleAdded) + assert isinstance(event1.timestamp, datetime) + assert event1.schedule_id == "schedule1" + assert event1.next_fire_time == datetime( + 2021, 9, 11, 12, 31, 56, 254867, timezone.utc ) - return broker -@pytest.fixture -def mqtt_broker(serializer: Serializer) -> EventBroker: - from paho.mqtt.client import Client - - from apscheduler.eventbrokers.mqtt import MQTTEventBroker - - return MQTTEventBroker(Client(), serializer=serializer) - - -@pytest.fixture -async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker: - from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker - - broker = AsyncpgEventBroker.from_dsn( - "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer +async def test_subscribe_one_shot(event_broker: EventBroker) -> None: + send, receive = create_memory_object_stream(2) + event_broker.subscribe(send.send, one_shot=True) + event = ScheduleAdded( + schedule_id="schedule1", + next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), ) - yield broker - - -@pytest.fixture( - params=[ - pytest.param(lazy_fixture("local_broker"), id="local"), - pytest.param( - lazy_fixture("redis_broker"), - id="redis", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service] - ), - ] -) -def broker(request: SubRequest) -> Generator[EventBroker, Any, None]: - request.param.start() - yield request.param - request.param.stop() - - -@pytest.fixture( - params=[ - pytest.param(lazy_fixture("local_async_broker"), id="local"), - pytest.param( - lazy_fixture("asyncpg_broker"), - id="asyncpg", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("async_redis_broker"), - id="async_redis", - marks=[pytest.mark.external_service], - ), - ] -) -async def raw_async_broker(request: SubRequest) -> AsyncEventBroker: - return cast(AsyncEventBroker, request.param) - - -@pytest.fixture -async def async_broker( - raw_async_broker: AsyncEventBroker, -) -> AsyncGenerator[AsyncEventBroker, Any]: - await raw_async_broker.start() - yield raw_async_broker - await raw_async_broker.stop() - - -class TestEventBroker: - def test_publish_subscribe(self, broker: EventBroker) -> None: - queue: Queue[Event] = Queue() - broker.subscribe(queue.put_nowait) - broker.subscribe(queue.put_nowait) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - broker.publish(event) - event1 = queue.get(timeout=3) - event2 = queue.get(timeout=1) - - assert event1 == event2 - assert isinstance(event1, ScheduleAdded) - assert isinstance(event1.timestamp, datetime) - assert event1.schedule_id == "schedule1" - assert event1.next_fire_time == datetime( - 2021, 9, 11, 12, 31, 56, 254867, timezone.utc - ) - - def test_subscribe_one_shot(self, broker: EventBroker) -> None: - queue: Queue[Event] = Queue() - broker.subscribe(queue.put_nowait, one_shot=True) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - broker.publish(event) - event = ScheduleAdded( - schedule_id="schedule2", - next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), - ) - broker.publish(event) - received_event = queue.get(timeout=3) - with pytest.raises(Empty): - queue.get(timeout=0.1) - - assert isinstance(received_event, ScheduleAdded) - assert received_event.schedule_id == "schedule1" - - def test_unsubscribe(self, broker: EventBroker, caplog) -> None: - queue: Queue[Event] = Queue() - subscription = broker.subscribe(queue.put_nowait) - broker.publish(Event()) - queue.get(timeout=3) - - subscription.unsubscribe() - broker.publish(Event()) - with pytest.raises(Empty): - queue.get(timeout=0.1) - - def test_publish_no_subscribers( - self, broker: EventBroker, caplog: LogCaptureFixture - ) -> None: - broker.publish(Event()) - assert not caplog.text - - def test_publish_exception( - self, broker: EventBroker, caplog: LogCaptureFixture - ) -> None: - def bad_subscriber(event: Event) -> None: - raise Exception("foo") - - timestamp = datetime.now(timezone.utc) - event_future: Future[Event] = Future() - broker.subscribe(bad_subscriber) - broker.subscribe(event_future.set_result) - broker.publish(Event(timestamp=timestamp)) - - event = event_future.result(3) - assert isinstance(event, Event) - assert event.timestamp == timestamp - assert "Error delivering Event" in caplog.text + await event_broker.publish(event) + event = ScheduleAdded( + schedule_id="schedule2", + next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), + ) + await event_broker.publish(event) + with fail_after(3): + received_event = await receive.receive() -@pytest.mark.anyio -class TestAsyncEventBroker: - async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None: - send, receive = create_memory_object_stream(2) - async_broker.subscribe(send.send) - async_broker.subscribe(send.send_nowait) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - await async_broker.publish(event) + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() - with fail_after(3): - event1 = await receive.receive() - event2 = await receive.receive() + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == "schedule1" - assert event1 == event2 - assert isinstance(event1, ScheduleAdded) - assert isinstance(event1.timestamp, datetime) - assert event1.schedule_id == "schedule1" - assert event1.next_fire_time == datetime( - 2021, 9, 11, 12, 31, 56, 254867, timezone.utc - ) - async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None: - send, receive = create_memory_object_stream(2) - async_broker.subscribe(send.send, one_shot=True) - event = ScheduleAdded( - schedule_id="schedule1", - next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc), - ) - await async_broker.publish(event) - event = ScheduleAdded( - schedule_id="schedule2", - next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc), - ) - await async_broker.publish(event) +async def test_unsubscribe(event_broker: EventBroker) -> None: + send, receive = create_memory_object_stream() + subscription = event_broker.subscribe(send.send) + await event_broker.publish(Event()) + with fail_after(3): + await receive.receive() - with fail_after(3): - received_event = await receive.receive() + subscription.unsubscribe() + await event_broker.publish(Event()) + with pytest.raises(TimeoutError), fail_after(0.1): + await receive.receive() - with pytest.raises(TimeoutError), fail_after(0.1): - await receive.receive() - assert isinstance(received_event, ScheduleAdded) - assert received_event.schedule_id == "schedule1" +async def test_publish_no_subscribers(event_broker, caplog: LogCaptureFixture) -> None: + await event_broker.publish(Event()) + assert not caplog.text - async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None: - send, receive = create_memory_object_stream() - subscription = async_broker.subscribe(send.send) - await async_broker.publish(Event()) - with fail_after(3): - await receive.receive() - subscription.unsubscribe() - await async_broker.publish(Event()) - with pytest.raises(TimeoutError), fail_after(0.1): - await receive.receive() +async def test_publish_exception(event_broker, caplog: LogCaptureFixture) -> None: + def bad_subscriber(event: Event) -> None: + raise Exception("foo") - async def test_publish_no_subscribers( - self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture - ) -> None: - await async_broker.publish(Event()) - assert not caplog.text + timestamp = datetime.now(timezone.utc) + send, receive = create_memory_object_stream() + event_broker.subscribe(bad_subscriber) + event_broker.subscribe(send.send) + await event_broker.publish(Event(timestamp=timestamp)) - async def test_publish_exception( - self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture - ) -> None: - def bad_subscriber(event: Event) -> None: - raise Exception("foo") + received_event = await receive.receive() + assert received_event.timestamp == timestamp + assert "Error delivering Event" in caplog.text - timestamp = datetime.now(timezone.utc) - send, receive = create_memory_object_stream() - async_broker.subscribe(bad_subscriber) - async_broker.subscribe(send.send) - await async_broker.publish(Event(timestamp=timestamp)) - received_event = await receive.receive() - assert received_event.timestamp == timestamp - assert "Error delivering Event" in caplog.text +async def test_cancel_start(raw_event_broker: EventBroker) -> None: + with CancelScope() as scope: + scope.cancel() + async with AsyncExitStack() as exit_stack: + await raw_event_broker.start(exit_stack) - async def test_cancel_start(self, raw_async_broker: AsyncEventBroker) -> None: - with CancelScope() as scope: - scope.cancel() - await raw_async_broker.start() - await raw_async_broker.stop() - async def test_cancel_stop(self, raw_async_broker: AsyncEventBroker) -> None: - with CancelScope() as scope: - await raw_async_broker.start() +async def test_cancel_stop(raw_event_broker: EventBroker) -> None: + with CancelScope() as scope: + async with AsyncExitStack() as exit_stack: + await raw_event_broker.start(exit_stack) scope.cancel() - await raw_async_broker.stop() diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 32c1c29..30c8dc9 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -26,15 +26,14 @@ from apscheduler import ( SchedulerStopped, Task, TaskAdded, + current_async_scheduler, current_job, current_scheduler, - current_worker, ) from apscheduler.schedulers.async_ import AsyncScheduler from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.workers.async_ import AsyncWorker if sys.version_info >= (3, 9): from zoneinfo import ZoneInfo @@ -70,7 +69,7 @@ class TestAsyncScheduler: received_events: list[Event] = [] event = anyio.Event() trigger = DateTrigger(datetime.now(timezone.utc)) - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: scheduler.event_broker.subscribe(listener) await scheduler.add_schedule(dummy_async_job, trigger, id="foo") await scheduler.start_in_background() @@ -111,7 +110,7 @@ class TestAsyncScheduler: assert not received_events async def test_add_get_schedule(self) -> None: - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: with pytest.raises(ScheduleLookupError): await scheduler.get_schedule("dummyid") @@ -121,7 +120,7 @@ class TestAsyncScheduler: assert isinstance(schedule, Schedule) async def test_add_get_schedules(self) -> None: - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: assert await scheduler.get_schedules() == [] schedule1_id = await scheduler.add_schedule( @@ -161,7 +160,7 @@ class TestAsyncScheduler: orig_start_time = datetime.now(timezone) - timedelta(seconds=1) fake_uniform = mocker.patch("random.uniform") fake_uniform.configure_mock(side_effect=lambda a, b: jitter) - async with AsyncScheduler(start_worker=False) as scheduler: + async with AsyncScheduler(process_jobs=False) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = anyio.Event() scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) @@ -263,8 +262,7 @@ class TestAsyncScheduler: async def test_contextvars(self) -> None: def check_contextvars() -> None: - assert current_scheduler.get() is scheduler - assert isinstance(current_worker.get(), AsyncWorker) + assert current_async_scheduler.get() is scheduler info = current_job.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" @@ -277,7 +275,7 @@ class TestAsyncScheduler: start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10) async with AsyncScheduler() as scheduler: await scheduler.data_store.add_task( - Task(id="task_id", func=check_contextvars) + Task(id="task_id", func=check_contextvars, executor="async") ) job = Job( task_id="task_id", @@ -300,10 +298,7 @@ class TestAsyncScheduler: async def test_wait_until_stopped(self) -> None: async with AsyncScheduler() as scheduler: - trigger = DateTrigger( - datetime.now(timezone.utc) + timedelta(milliseconds=100) - ) - await scheduler.add_schedule(scheduler.stop, trigger) + await scheduler.add_job(scheduler.stop) await scheduler.wait_until_stopped() # This should be a no-op @@ -422,7 +417,7 @@ class TestSyncScheduler: # Check that the job was created with the proper amount of jitter in its # scheduled time - jobs = scheduler.data_store.get_jobs({job_id}) + jobs = scheduler._portal.call(scheduler.data_store.get_jobs, {job_id}) assert jobs[0].jitter == timedelta(seconds=jitter) assert jobs[0].scheduled_fire_time == orig_start_time + timedelta( seconds=jitter @@ -495,7 +490,6 @@ class TestSyncScheduler: def test_contextvars(self) -> None: def check_contextvars() -> None: assert current_scheduler.get() is scheduler - assert current_worker.get() is not None info = current_job.get() assert info.task_id == "task_id" assert info.schedule_id == "foo" @@ -507,7 +501,10 @@ class TestSyncScheduler: scheduled_fire_time = datetime.now(timezone.utc) start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10) with Scheduler() as scheduler: - scheduler.data_store.add_task(Task(id="task_id", func=check_contextvars)) + scheduler._portal.call( + scheduler.data_store.add_task, + Task(id="task_id", func=check_contextvars, executor="threadpool"), + ) job = Job( task_id="task_id", schedule_id="foo", @@ -517,7 +514,7 @@ class TestSyncScheduler: tags={"foo", "bar"}, result_expiration_time=timedelta(seconds=10), ) - scheduler.data_store.add_job(job) + scheduler._portal.call(scheduler.data_store.add_job, job) scheduler.start_in_background() result = scheduler.get_job_result(job.id) if result.outcome is JobOutcome.error: @@ -527,10 +524,7 @@ class TestSyncScheduler: def test_wait_until_stopped(self) -> None: with Scheduler() as scheduler: - trigger = DateTrigger( - datetime.now(timezone.utc) + timedelta(milliseconds=100) - ) - scheduler.add_schedule(scheduler.stop, trigger) + scheduler.add_job(scheduler.stop) scheduler.start_in_background() scheduler.wait_until_stopped() diff --git a/tests/test_workers.py b/tests/test_workers.py deleted file mode 100644 index aecc63b..0000000 --- a/tests/test_workers.py +++ /dev/null @@ -1,281 +0,0 @@ -from __future__ import annotations - -import threading -from datetime import datetime, timezone -from typing import Callable - -import anyio -import pytest -from anyio import fail_after - -from apscheduler import ( - Event, - Job, - JobAcquired, - JobAdded, - JobOutcome, - JobReleased, - Task, - TaskAdded, - WorkerStopped, -) -from apscheduler.datastores.memory import MemoryDataStore -from apscheduler.workers.async_ import AsyncWorker -from apscheduler.workers.sync import Worker - -pytestmark = pytest.mark.anyio - - -def sync_func(*args, fail: bool, **kwargs): - if fail: - raise Exception("failing as requested") - else: - return args, kwargs - - -async def async_func(*args, fail: bool, **kwargs): - if fail: - raise Exception("failing as requested") - else: - return args, kwargs - - -def fail_func(): - pytest.fail("This function should never be run") - - -class TestAsyncWorker: - @pytest.mark.parametrize( - "target_func", [sync_func, async_func], ids=["sync", "async"] - ) - @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"]) - async def test_run_job_nonscheduled_success( - self, target_func: Callable, fail: bool - ) -> None: - def listener(received_event: Event): - received_events.append(received_event) - if isinstance(received_event, JobReleased): - event.set() - - received_events: list[Event] = [] - event = anyio.Event() - async with AsyncWorker(MemoryDataStore()) as worker: - worker.event_broker.subscribe(listener) - await worker.data_store.add_task(Task(id="task_id", func=target_func)) - job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) - await worker.data_store.add_job(job) - with fail_after(3): - await event.wait() - - # First, a task was added - received_event = received_events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "task_id" - - # Then a job was added - received_event = received_events.pop(0) - assert isinstance(received_event, JobAdded) - assert received_event.job_id == job.id - assert received_event.task_id == "task_id" - assert received_event.schedule_id is None - - # Then the job was started - received_event = received_events.pop(0) - assert isinstance(received_event, JobAcquired) - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - received_event = received_events.pop(0) - if fail: - # Then the job failed - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.error - assert received_event.exception_type == "Exception" - assert received_event.exception_message == "failing as requested" - assert isinstance(received_event.exception_traceback, list) - assert all( - isinstance(line, str) for line in received_event.exception_traceback - ) - else: - # Then the job finished successfully - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.success - assert received_event.exception_type is None - assert received_event.exception_message is None - assert received_event.exception_traceback is None - - # Finally, the worker was stopped - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStopped) - - # There should be no more events on the list - assert not received_events - - async def test_run_deadline_missed(self) -> None: - def listener(received_event: Event): - received_events.append(received_event) - if isinstance(received_event, JobReleased): - event.set() - - scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) - received_events: list[Event] = [] - event = anyio.Event() - async with AsyncWorker(MemoryDataStore()) as worker: - worker.event_broker.subscribe(listener) - await worker.data_store.add_task(Task(id="task_id", func=fail_func)) - job = Job( - task_id="task_id", - schedule_id="foo", - scheduled_fire_time=scheduled_start_time, - start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc), - ) - await worker.data_store.add_job(job) - with fail_after(3): - await event.wait() - - # First, a task was added - received_event = received_events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "task_id" - - # Then a job was added - received_event = received_events.pop(0) - assert isinstance(received_event, JobAdded) - assert received_event.job_id == job.id - assert received_event.task_id == "task_id" - assert received_event.schedule_id == "foo" - - # The worker acquired the job - received_event = received_events.pop(0) - assert isinstance(received_event, JobAcquired) - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - # The worker determined that the deadline has been missed - received_event = received_events.pop(0) - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.missed_start_deadline - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - # Finally, the worker was stopped - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStopped) - - # There should be no more events on the list - assert not received_events - - -class TestSyncWorker: - @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"]) - def test_run_job_nonscheduled(self, fail: bool) -> None: - def listener(received_event: Event): - received_events.append(received_event) - if isinstance(received_event, JobReleased): - event.set() - - received_events: list[Event] = [] - event = threading.Event() - with Worker(MemoryDataStore()) as worker: - worker.event_broker.subscribe(listener) - worker.data_store.add_task(Task(id="task_id", func=sync_func)) - job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) - worker.data_store.add_job(job) - event.wait(3) - - # First, a task was added - received_event = received_events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "task_id" - - # Then a job was added - received_event = received_events.pop(0) - assert isinstance(received_event, JobAdded) - assert received_event.job_id == job.id - assert received_event.task_id == "task_id" - assert received_event.schedule_id is None - - # Then the job was started - received_event = received_events.pop(0) - assert isinstance(received_event, JobAcquired) - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - received_event = received_events.pop(0) - if fail: - # Then the job failed - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.error - assert received_event.exception_type == "Exception" - assert received_event.exception_message == "failing as requested" - assert isinstance(received_event.exception_traceback, list) - assert all( - isinstance(line, str) for line in received_event.exception_traceback - ) - else: - # Then the job finished successfully - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.success - assert received_event.exception_type is None - assert received_event.exception_message is None - assert received_event.exception_traceback is None - - # Finally, the worker was stopped - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStopped) - - # There should be no more events on the list - assert not received_events - - def test_run_deadline_missed(self) -> None: - def listener(received_event: Event): - received_events.append(received_event) - if isinstance(received_event, JobReleased): - event.set() - - scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) - received_events: list[Event] = [] - event = threading.Event() - with Worker(MemoryDataStore()) as worker: - worker.event_broker.subscribe(listener) - worker.data_store.add_task(Task(id="task_id", func=fail_func)) - job = Job( - task_id="task_id", - schedule_id="foo", - scheduled_fire_time=scheduled_start_time, - start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc), - ) - worker.data_store.add_job(job) - event.wait(3) - - # First, a task was added - received_event = received_events.pop(0) - assert isinstance(received_event, TaskAdded) - assert received_event.task_id == "task_id" - - # Then a job was added - received_event = received_events.pop(0) - assert isinstance(received_event, JobAdded) - assert received_event.job_id == job.id - assert received_event.task_id == "task_id" - assert received_event.schedule_id == "foo" - - # The worker acquired the job - received_event = received_events.pop(0) - assert isinstance(received_event, JobAcquired) - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - # The worker determined that the deadline has been missed - received_event = received_events.pop(0) - assert isinstance(received_event, JobReleased) - assert received_event.outcome is JobOutcome.missed_start_deadline - assert received_event.job_id == job.id - assert received_event.worker_id == worker.identity - - # Finally, the worker was stopped - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStopped) - - # There should be no more events on the list - assert not received_events -- cgit v1.2.1