summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-12 22:09:05 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-21 02:40:02 +0300
commitc5727432736b55b7d76753307f14efdb962c2edf (patch)
tree005bd129694b56bd601d65c4cdf43828cfcd4381 /tests
parent26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff)
downloadapscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler - Removed workers as a user interface - Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface - Added the current_async_scheduler contextvar - Added job executors
Diffstat (limited to 'tests')
-rw-r--r--tests/conftest.py194
-rw-r--r--tests/test_datastores.py1435
-rw-r--r--tests/test_eventbrokers.py325
-rw-r--r--tests/test_schedulers.py36
-rw-r--r--tests/test_workers.py281
5 files changed, 710 insertions, 1561 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index 31ea9b0..7497b52 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,10 +1,16 @@
from __future__ import annotations
import sys
+from contextlib import AsyncExitStack
+from tempfile import TemporaryDirectory
+from typing import Any, AsyncGenerator, cast
import pytest
+from _pytest.fixtures import SubRequest
+from pytest_lazyfixture import lazy_fixture
-from apscheduler.abc import Serializer
+from apscheduler.abc import DataStore, EventBroker, Serializer
+from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.serializers.cbor import CBORSerializer
from apscheduler.serializers.json import JSONSerializer
from apscheduler.serializers.pickle import PickleSerializer
@@ -34,3 +40,189 @@ def serializer(request) -> Serializer | None:
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
+
+
+@pytest.fixture
+def local_broker() -> EventBroker:
+ from apscheduler.eventbrokers.local import LocalEventBroker
+
+ return LocalEventBroker()
+
+
+@pytest.fixture
+async def redis_broker(serializer: Serializer) -> EventBroker:
+ from apscheduler.eventbrokers.redis import RedisEventBroker
+
+ broker = RedisEventBroker.from_url(
+ "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05
+ )
+ await broker.client.flushdb()
+ return broker
+
+
+@pytest.fixture
+def mqtt_broker(serializer: Serializer) -> EventBroker:
+ from paho.mqtt.client import Client
+
+ from apscheduler.eventbrokers.mqtt import MQTTEventBroker
+
+ return MQTTEventBroker(Client(), serializer=serializer)
+
+
+@pytest.fixture
+async def asyncpg_broker(serializer: Serializer) -> EventBroker:
+ from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
+
+ broker = AsyncpgEventBroker.from_dsn(
+ "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer
+ )
+ return broker
+
+
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("local_broker"), id="local"),
+ pytest.param(
+ lazy_fixture("asyncpg_broker"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("redis_broker"),
+ id="redis",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
+ ),
+ ]
+)
+async def raw_event_broker(request: SubRequest) -> EventBroker:
+ return cast(EventBroker, request.param)
+
+
+@pytest.fixture
+async def event_broker(
+ raw_event_broker: EventBroker,
+) -> AsyncGenerator[EventBroker, Any]:
+ async with AsyncExitStack() as exit_stack:
+ await raw_event_broker.start(exit_stack)
+ yield raw_event_broker
+
+
+@pytest.fixture
+def memory_store() -> DataStore:
+ yield MemoryDataStore()
+
+
+@pytest.fixture
+def mongodb_store() -> DataStore:
+ from pymongo import MongoClient
+
+ from apscheduler.datastores.mongodb import MongoDBDataStore
+
+ with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client:
+ yield MongoDBDataStore(client, start_from_scratch=True)
+
+
+@pytest.fixture
+def sqlite_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ with TemporaryDirectory("sqlite_") as tempdir:
+ engine = create_engine(f"sqlite:///{tempdir}/test.db")
+ try:
+ yield SQLAlchemyDataStore(engine)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+def psycopg2_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb")
+ try:
+ yield SQLAlchemyDataStore(engine, schema="alter", start_from_scratch=True)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+def mysql_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_engine("mysql+pymysql://root:secret@localhost/testdb")
+ try:
+ yield SQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+async def asyncpg_store() -> DataStore:
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_async_engine(
+ "postgresql+asyncpg://postgres:secret@localhost/testdb", future=True
+ )
+ try:
+ yield SQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ await engine.dispose()
+
+
+@pytest.fixture
+async def asyncmy_store() -> DataStore:
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_async_engine(
+ "mysql+asyncmy://root:secret@localhost/testdb?charset=utf8mb4", future=True
+ )
+ try:
+ yield SQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ await engine.dispose()
+
+
+@pytest.fixture(
+ params=[
+ pytest.param(
+ lazy_fixture("asyncpg_store"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("asyncmy_store"),
+ id="asyncmy",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mongodb_store"),
+ id="mongodb",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+)
+async def raw_datastore(request: SubRequest) -> DataStore:
+ return cast(DataStore, request.param)
+
+
+@pytest.fixture
+async def datastore(
+ raw_datastore: DataStore, local_broker: EventBroker
+) -> AsyncGenerator[DataStore, Any]:
+ async with AsyncExitStack() as exit_stack:
+ await local_broker.start(exit_stack)
+ await raw_datastore.start(exit_stack, local_broker)
+ yield raw_datastore
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 9c9a0cf..618369a 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -1,18 +1,13 @@
from __future__ import annotations
-import threading
-from collections.abc import Generator
-from contextlib import asynccontextmanager, contextmanager
+from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime, timedelta, timezone
-from tempfile import TemporaryDirectory
-from typing import Any, AsyncGenerator, cast
+from typing import AsyncGenerator
import anyio
import pytest
-from _pytest.fixtures import SubRequest
from anyio import CancelScope
from freezegun.api import FrozenDateTimeFactory
-from pytest_lazyfixture import lazy_fixture
from apscheduler import (
CoalescePolicy,
@@ -30,1055 +25,483 @@ from apscheduler import (
TaskLookupError,
TaskUpdated,
)
-from apscheduler.abc import AsyncDataStore, AsyncEventBroker, DataStore, EventBroker
-from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
-from apscheduler.datastores.memory import MemoryDataStore
-from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
-from apscheduler.eventbrokers.local import LocalEventBroker
+from apscheduler.abc import DataStore, EventBroker
from apscheduler.triggers.date import DateTrigger
+pytestmark = pytest.mark.anyio
+
@pytest.fixture
-def memory_store() -> DataStore:
- yield MemoryDataStore()
+def schedules() -> list[Schedule]:
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule1 = Schedule(id="s1", task_id="task1", trigger=trigger)
+ schedule1.next_fire_time = trigger.next()
+ trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc))
+ schedule2 = Schedule(id="s2", task_id="task2", trigger=trigger)
+ schedule2.next_fire_time = trigger.next()
-@pytest.fixture
-def adapted_memory_store() -> AsyncDataStore:
- store = MemoryDataStore()
- return AsyncDataStoreAdapter(store)
+ trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc))
+ schedule3 = Schedule(id="s3", task_id="task1", trigger=trigger)
+ return [schedule1, schedule2, schedule3]
-@pytest.fixture
-def mongodb_store() -> DataStore:
- from pymongo import MongoClient
+@asynccontextmanager
+async def capture_events(
+ datastore: DataStore,
+ limit: int,
+ event_types: set[type[Event]] | None = None,
+) -> AsyncGenerator[list[Event], None]:
+ def listener(event: Event) -> None:
+ events.append(event)
+ if len(events) == limit:
+ limit_event.set()
+ subscription.unsubscribe()
+
+ events: list[Event] = []
+ limit_event = anyio.Event()
+ subscription = datastore._event_broker.subscribe(listener, event_types)
+ yield events
+ if limit:
+ with anyio.fail_after(3):
+ await limit_event.wait()
+
+
+async def test_add_replace_task(datastore: DataStore) -> None:
+ import math
+
+ event_types = {TaskAdded, TaskUpdated}
+ async with capture_events(datastore, 3, event_types) as events:
+ await datastore.add_task(Task(id="test_task", func=print, executor="async"))
+ await datastore.add_task(
+ Task(id="test_task2", func=math.ceil, executor="async")
+ )
+ await datastore.add_task(Task(id="test_task", func=repr, executor="async"))
- from apscheduler.datastores.mongodb import MongoDBDataStore
+ tasks = await datastore.get_tasks()
+ assert len(tasks) == 2
+ assert tasks[0].id == "test_task"
+ assert tasks[0].func is repr
+ assert tasks[1].id == "test_task2"
+ assert tasks[1].func is math.ceil
- with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client:
- yield MongoDBDataStore(client, start_from_scratch=True)
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task"
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task2"
-@pytest.fixture
-def sqlite_store() -> DataStore:
- from sqlalchemy.future import create_engine
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskUpdated)
+ assert received_event.task_id == "test_task"
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+ assert not events
- with TemporaryDirectory("sqlite_") as tempdir:
- engine = create_engine(f"sqlite:///{tempdir}/test.db")
- try:
- yield SQLAlchemyDataStore(engine)
- finally:
- engine.dispose()
+async def test_add_schedules(datastore: DataStore, schedules: list[Schedule]) -> None:
+ async with capture_events(datastore, 3, {ScheduleAdded}) as events:
+ for schedule in schedules:
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
-@pytest.fixture
-def psycopg2_store() -> DataStore:
- from sqlalchemy.future import create_engine
+ assert await datastore.get_schedules() == schedules
+ assert await datastore.get_schedules({"s1", "s2", "s3"}) == schedules
+ assert await datastore.get_schedules({"s1"}) == [schedules[0]]
+ assert await datastore.get_schedules({"s2"}) == [schedules[1]]
+ assert await datastore.get_schedules({"s3"}) == [schedules[2]]
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+ for event, schedule in zip(events, schedules):
+ assert event.schedule_id == schedule.id
+ assert event.next_fire_time == schedule.next_fire_time
- engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb")
- try:
- yield SQLAlchemyDataStore(engine, schema="alter", start_from_scratch=True)
- finally:
- engine.dispose()
+async def test_replace_schedules(
+ datastore: DataStore, schedules: list[Schedule]
+) -> None:
+ async with capture_events(datastore, 1, {ScheduleUpdated}) as events:
+ for schedule in schedules:
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
-@pytest.fixture
-def mysql_store() -> DataStore:
- from sqlalchemy.future import create_engine
+ next_fire_time = schedules[2].trigger.next()
+ schedule = Schedule(
+ id="s3",
+ task_id="foo",
+ trigger=schedules[2].trigger,
+ args=(),
+ kwargs={},
+ coalesce=CoalescePolicy.earliest,
+ misfire_grace_time=None,
+ tags=frozenset(),
+ )
+ schedule.next_fire_time = next_fire_time
+ await datastore.add_schedule(schedule, ConflictPolicy.replace)
+
+ schedules = await datastore.get_schedules({schedule.id})
+ assert schedules[0].task_id == "foo"
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].args == ()
+ assert schedules[0].kwargs == {}
+ assert schedules[0].coalesce is CoalescePolicy.earliest
+ assert schedules[0].misfire_grace_time is None
+ assert schedules[0].tags == frozenset()
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s3"
+ assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+ assert not events
+
+
+async def test_remove_schedules(
+ datastore: DataStore, schedules: list[Schedule]
+) -> None:
+ async with capture_events(datastore, 2, {ScheduleRemoved}) as events:
+ for schedule in schedules:
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+ await datastore.remove_schedules(["s1", "s2"])
+ assert await datastore.get_schedules() == [schedules[2]]
- engine = create_engine("mysql+pymysql://root:secret@localhost/testdb")
- try:
- yield SQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- engine.dispose()
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s1"
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s2"
-@pytest.fixture
-async def asyncpg_store() -> AsyncDataStore:
- from sqlalchemy.ext.asyncio import create_async_engine
+ assert not events
- from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
- engine = create_async_engine(
- "postgresql+asyncpg://postgres:secret@localhost/testdb", future=True
+@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
+async def test_acquire_release_schedules(
+ datastore: DataStore, schedules: list[Schedule]
+) -> None:
+ event_types = {ScheduleRemoved, ScheduleUpdated}
+ async with capture_events(datastore, 2, event_types) as events:
+ for schedule in schedules:
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ # The first scheduler gets the first due schedule
+ schedules1 = await datastore.acquire_schedules("dummy-id1", 1)
+ assert len(schedules1) == 1
+ assert schedules1[0].id == "s1"
+
+ # The second scheduler gets the second due schedule
+ schedules2 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert len(schedules2) == 1
+ assert schedules2[0].id == "s2"
+
+ # The third scheduler gets nothing
+ schedules3 = await datastore.acquire_schedules("dummy-id3", 1)
+ assert not schedules3
+
+ # Update the schedules and check that the job store actually deletes the
+ # first one and updates the second one
+ schedules1[0].next_fire_time = None
+ schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ # Release all the schedules
+ await datastore.release_schedules("dummy-id1", schedules1)
+ await datastore.release_schedules("dummy-id2", schedules2)
+
+ # Check that the first schedule is gone
+ schedules = await datastore.get_schedules()
+ assert len(schedules) == 2
+ assert schedules[0].id == "s2"
+ assert schedules[1].id == "s3"
+
+ # Check for the appropriate update and delete events
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == "s1"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleUpdated)
+ assert received_event.schedule_id == "s2"
+ assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ assert not events
+
+
+async def test_release_schedule_two_identical_fire_times(datastore: DataStore) -> None:
+ """Regression test for #616."""
+ for i in range(1, 3):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = await datastore.acquire_schedules("foo", 3)
+ schedules[0].next_fire_time = None
+ await datastore.release_schedules("foo", schedules)
+
+ remaining = await datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 1
+ assert remaining[0].id == schedules[1].id
+
+
+async def test_release_two_schedules_at_once(datastore: DataStore) -> None:
+ """Regression test for #621."""
+ for i in range(2):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = await datastore.acquire_schedules("foo", 3)
+ await datastore.release_schedules("foo", schedules)
+
+ remaining = await datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 2
+
+
+async def test_acquire_schedules_lock_timeout(
+ datastore: DataStore, schedules: list[Schedule], freezer
+) -> None:
+ """
+ Test that a scheduler can acquire schedules that were acquired by another
+ scheduler but not released within the lock timeout period.
+
+ """
+ await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
+
+ # First, one scheduler acquires the first available schedule
+ acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
+ assert len(acquired1) == 1
+ assert acquired1[0].id == "s1"
+
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert not acquired2
+
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert len(acquired3) == 1
+ assert acquired3[0].id == "s1"
+
+
+async def test_acquire_multiple_workers(datastore: DataStore) -> None:
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async")
)
- try:
- yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- await engine.dispose()
+ jobs = [Job(task_id="task1") for _ in range(2)]
+ for job in jobs:
+ await datastore.add_job(job)
+ # The first worker gets the first job in the queue
+ jobs1 = await datastore.acquire_jobs("worker1", 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
-@pytest.fixture
-async def asyncmy_store() -> AsyncDataStore:
- from sqlalchemy.ext.asyncio import create_async_engine
+ # The second worker gets the second job
+ jobs2 = await datastore.acquire_jobs("worker2", 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
+
+ # The third worker gets nothing
+ jobs3 = await datastore.acquire_jobs("worker3", 1)
+ assert not jobs3
- from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
- engine = create_async_engine(
- "mysql+asyncmy://root:secret@localhost/testdb?charset=utf8mb4", future=True
+async def test_job_release_success(datastore: DataStore) -> None:
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async")
)
- try:
- yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- await engine.dispose()
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
+ return_value="foo",
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == "foo"
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
-@pytest.fixture
-def schedules() -> list[Schedule]:
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule1 = Schedule(id="s1", task_id="task1", trigger=trigger)
- schedule1.next_fire_time = trigger.next()
- trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc))
- schedule2 = Schedule(id="s2", task_id="task2", trigger=trigger)
- schedule2.next_fire_time = trigger.next()
+async def test_job_release_failure(datastore: DataStore) -> None:
+ await datastore.add_task(
+ Task(id="task1", executor="async", func=asynccontextmanager)
+ )
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ("foo",)
+ assert result.return_value is None
- trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc))
- schedule3 = Schedule(id="s3", task_id="task1", trigger=trigger)
- return [schedule1, schedule2, schedule3]
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
-class TestDataStores:
- @contextmanager
- def capture_events(
- self,
- datastore: DataStore,
- limit: int,
- event_types: set[type[Event]] | None = None,
- ) -> Generator[list[Event], None, None]:
- def listener(event: Event) -> None:
- events.append(event)
- if len(events) == limit:
- limit_event.set()
- subscription.unsubscribe()
-
- events: list[Event] = []
- limit_event = threading.Event()
- subscription = datastore.events.subscribe(listener, event_types)
- yield events
- if limit:
- limit_event.wait(2)
-
- @pytest.fixture
- def event_broker(self) -> Generator[EventBroker, Any, None]:
- broker = LocalEventBroker()
- broker.start()
- yield broker
- broker.stop()
-
- @pytest.fixture(
- params=[
- pytest.param(lazy_fixture("memory_store"), id="memory"),
- pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
- pytest.param(
- lazy_fixture("mongodb_store"),
- id="mongodb",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("psycopg2_store"),
- id="psycopg2",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("mysql_store"),
- id="mysql",
- marks=[pytest.mark.external_service],
- ),
- ]
+async def test_job_release_missed_deadline(datastore: DataStore):
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async")
)
- def datastore(
- self, request: SubRequest, event_broker: EventBroker
- ) -> Generator[DataStore, Any, None]:
- datastore = cast(DataStore, request.param)
- datastore.start(event_broker)
- yield datastore
- datastore.stop()
-
- def test_add_replace_task(self, datastore: DataStore) -> None:
- import math
-
- event_types = {TaskAdded, TaskUpdated}
- with self.capture_events(datastore, 3, event_types) as events:
- datastore.add_task(Task(id="test_task", func=print))
- datastore.add_task(Task(id="test_task2", func=math.ceil))
- datastore.add_task(Task(id="test_task", func=repr))
-
- tasks = datastore.get_tasks()
- assert len(tasks) == 2
- assert tasks[0].id == "test_task"
- assert tasks[0].func is repr
- assert tasks[1].id == "test_task2"
- assert tasks[1].func is math.ceil
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "test_task"
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "test_task2"
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskUpdated)
- assert received_event.task_id == "test_task"
-
- assert not events
-
- def test_add_schedules(
- self, datastore: DataStore, schedules: list[Schedule]
- ) -> None:
- with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
- for schedule in schedules:
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- assert datastore.get_schedules() == schedules
- assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules
- assert datastore.get_schedules({"s1"}) == [schedules[0]]
- assert datastore.get_schedules({"s2"}) == [schedules[1]]
- assert datastore.get_schedules({"s3"}) == [schedules[2]]
-
- for event, schedule in zip(events, schedules):
- assert event.schedule_id == schedule.id
- assert event.next_fire_time == schedule.next_fire_time
-
- def test_replace_schedules(
- self, datastore: DataStore, schedules: list[Schedule]
- ) -> None:
- with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
- for schedule in schedules:
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- next_fire_time = schedules[2].trigger.next()
- schedule = Schedule(
- id="s3",
- task_id="foo",
- trigger=schedules[2].trigger,
- args=(),
- kwargs={},
- coalesce=CoalescePolicy.earliest,
- misfire_grace_time=None,
- tags=frozenset(),
- )
- schedule.next_fire_time = next_fire_time
- datastore.add_schedule(schedule, ConflictPolicy.replace)
-
- schedules = datastore.get_schedules({schedule.id})
- assert schedules[0].task_id == "foo"
- assert schedules[0].next_fire_time == next_fire_time
- assert schedules[0].args == ()
- assert schedules[0].kwargs == {}
- assert schedules[0].coalesce is CoalescePolicy.earliest
- assert schedules[0].misfire_grace_time is None
- assert schedules[0].tags == frozenset()
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s3"
- assert received_event.next_fire_time == datetime(
- 2020, 9, 15, tzinfo=timezone.utc
- )
- assert not events
-
- def test_remove_schedules(
- self, datastore: DataStore, schedules: list[Schedule]
- ) -> None:
- with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
- for schedule in schedules:
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- datastore.remove_schedules(["s1", "s2"])
- assert datastore.get_schedules() == [schedules[2]]
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s1"
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s2"
-
- assert not events
-
- @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
- def test_acquire_release_schedules(
- self, datastore: DataStore, schedules: list[Schedule]
- ) -> None:
- event_types = {ScheduleRemoved, ScheduleUpdated}
- with self.capture_events(datastore, 2, event_types) as events:
- for schedule in schedules:
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- # The first scheduler gets the first due schedule
- schedules1 = datastore.acquire_schedules("dummy-id1", 1)
- assert len(schedules1) == 1
- assert schedules1[0].id == "s1"
-
- # The second scheduler gets the second due schedule
- schedules2 = datastore.acquire_schedules("dummy-id2", 1)
- assert len(schedules2) == 1
- assert schedules2[0].id == "s2"
-
- # The third scheduler gets nothing
- schedules3 = datastore.acquire_schedules("dummy-id3", 1)
- assert not schedules3
-
- # Update the schedules and check that the job store actually deletes the
- # first one and updates the second one
- schedules1[0].next_fire_time = None
- schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
-
- # Release all the schedules
- datastore.release_schedules("dummy-id1", schedules1)
- datastore.release_schedules("dummy-id2", schedules2)
-
- # Check that the first schedule is gone
- schedules = datastore.get_schedules()
- assert len(schedules) == 2
- assert schedules[0].id == "s2"
- assert schedules[1].id == "s3"
-
- # Check for the appropriate update and delete events
- received_event = events.pop(0)
- assert isinstance(received_event, ScheduleRemoved)
- assert received_event.schedule_id == "s1"
-
- received_event = events.pop(0)
- assert isinstance(received_event, ScheduleUpdated)
- assert received_event.schedule_id == "s2"
- assert received_event.next_fire_time == datetime(
- 2020, 9, 15, tzinfo=timezone.utc
- )
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
- assert not events
-
- def test_release_schedule_two_identical_fire_times(
- self, datastore: DataStore
- ) -> None:
- """Regression test for #616."""
- for i in range(1, 3):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- schedules = datastore.acquire_schedules("foo", 3)
- schedules[0].next_fire_time = None
- datastore.release_schedules("foo", schedules)
-
- remaining = datastore.get_schedules({s.id for s in schedules})
- assert len(remaining) == 1
- assert remaining[0].id == schedules[1].id
-
- def test_release_two_schedules_at_once(self, datastore: DataStore) -> None:
- """Regression test for #621."""
- for i in range(2):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- schedules = datastore.acquire_schedules("foo", 3)
- datastore.release_schedules("foo", schedules)
-
- remaining = datastore.get_schedules({s.id for s in schedules})
- assert len(remaining) == 2
-
- def test_acquire_schedules_lock_timeout(
- self, datastore: DataStore, schedules: list[Schedule], freezer
- ) -> None:
- """
- Test that a scheduler can acquire schedules that were acquired by another
- scheduler but not released within the lock timeout period.
-
- """
- datastore.add_schedule(schedules[0], ConflictPolicy.exception)
-
- # First, one scheduler acquires the first available schedule
- acquired1 = datastore.acquire_schedules("dummy-id1", 1)
- assert len(acquired1) == 1
- assert acquired1[0].id == "s1"
-
- # Try to acquire the schedule just at the threshold (now == acquired_until).
- # This should not yield any schedules.
- freezer.tick(30)
- acquired2 = datastore.acquire_schedules("dummy-id2", 1)
- assert not acquired2
-
- # Right after that, the schedule should be available
- freezer.tick(1)
- acquired3 = datastore.acquire_schedules("dummy-id2", 1)
- assert len(acquired3) == 1
- assert acquired3[0].id == "s1"
-
- def test_acquire_multiple_workers(self, datastore: DataStore) -> None:
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- jobs = [Job(task_id="task1") for _ in range(2)]
- for job in jobs:
- datastore.add_job(job)
-
- # The first worker gets the first job in the queue
- jobs1 = datastore.acquire_jobs("worker1", 1)
- assert len(jobs1) == 1
- assert jobs1[0].id == jobs[0].id
-
- # The second worker gets the second job
- jobs2 = datastore.acquire_jobs("worker2", 1)
- assert len(jobs2) == 1
- assert jobs2[0].id == jobs[1].id
-
- # The third worker gets nothing
- jobs3 = datastore.acquire_jobs("worker3", 1)
- assert not jobs3
-
- def test_job_release_success(self, datastore: DataStore) -> None:
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- datastore.add_job(job)
-
- acquired = datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.success,
- return_value="foo",
- ),
- )
- result = datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.success
- assert result.exception is None
- assert result.return_value == "foo"
-
- # Check that the job and its result are gone
- assert not datastore.get_jobs({acquired[0].id})
- assert not datastore.get_job_result(acquired[0].id)
-
- def test_job_release_failure(self, datastore: DataStore) -> None:
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- datastore.add_job(job)
-
- acquired = datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.error,
- exception=ValueError("foo"),
- ),
- )
- result = datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.error
- assert isinstance(result.exception, ValueError)
- assert result.exception.args == ("foo",)
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not datastore.get_jobs({acquired[0].id})
- assert not datastore.get_job_result(acquired[0].id)
-
- def test_job_release_missed_deadline(self, datastore: DataStore):
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- datastore.add_job(job)
-
- acquired = datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.missed_start_deadline,
- ),
- )
- result = datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.missed_start_deadline
- assert result.exception is None
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not datastore.get_jobs({acquired[0].id})
- assert not datastore.get_job_result(acquired[0].id)
-
- def test_job_release_cancelled(self, datastore: DataStore) -> None:
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- datastore.add_job(job)
-
- acquired = datastore.acquire_jobs("worker1", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- datastore.release_job(
- "worker1",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.cancelled,
- ),
- )
- result = datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.cancelled
- assert result.exception is None
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not datastore.get_jobs({acquired[0].id})
- assert not datastore.get_job_result(acquired[0].id)
-
- def test_acquire_jobs_lock_timeout(
- self, datastore: DataStore, freezer: FrozenDateTimeFactory
- ) -> None:
- """
- Test that a worker can acquire jobs that were acquired by another scheduler but
- not released within the lock timeout period.
-
- """
- datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- datastore.add_job(job)
-
- # First, one worker acquires the first available job
- acquired = datastore.acquire_jobs("worker1", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- # Try to acquire the job just at the threshold (now == acquired_until).
- # This should not yield any jobs.
- freezer.tick(30)
- assert not datastore.acquire_jobs("worker2", 1)
-
- # Right after that, the job should be available
- freezer.tick(1)
- acquired = datastore.acquire_jobs("worker2", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None:
- datastore.add_task(
- Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
- )
- jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
- for job in jobs:
- datastore.add_job(job)
-
- # Check that only 2 jobs are returned from acquire_jobs() even though the limit
- # wqas 3
- acquired_jobs = datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
-
- # Release one job, and the worker should be able to acquire the third job
- datastore.release_job(
- "worker1",
- acquired_jobs[0].task_id,
- JobResult.from_job(
- acquired_jobs[0],
- JobOutcome.success,
- return_value=None,
- ),
- )
- acquired_jobs = datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [jobs[2].id]
-
- def test_add_get_task(self, datastore: DataStore) -> None:
- with pytest.raises(TaskLookupError):
- datastore.get_task("dummyid")
-
- datastore.add_task(Task(id="dummyid", func=asynccontextmanager))
- task = datastore.get_task("dummyid")
- assert task.id == "dummyid"
- assert task.func is asynccontextmanager
-
-
-@pytest.mark.anyio
-class TestAsyncDataStores:
- @asynccontextmanager
- async def capture_events(
- self,
- datastore: AsyncDataStore,
- limit: int,
- event_types: set[type[Event]] | None = None,
- ) -> AsyncGenerator[list[Event], None]:
- def listener(event: Event) -> None:
- events.append(event)
- if len(events) == limit:
- limit_event.set()
- subscription.unsubscribe()
-
- events: list[Event] = []
- limit_event = anyio.Event()
- subscription = datastore.events.subscribe(listener, event_types)
- yield events
- if limit:
- with anyio.fail_after(3):
- await limit_event.wait()
-
- @pytest.fixture
- async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]:
- broker = LocalAsyncEventBroker()
- await broker.start()
- yield broker
- await broker.stop()
-
- @pytest.fixture(
- params=[
- pytest.param(lazy_fixture("adapted_memory_store"), id="memory"),
- pytest.param(
- lazy_fixture("asyncpg_store"),
- id="asyncpg",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("asyncmy_store"),
- id="asyncmy",
- marks=[pytest.mark.external_service],
- ),
- ]
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
+
+
+async def test_job_release_cancelled(datastore: DataStore) -> None:
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async")
)
- async def raw_datastore(
- self, request: SubRequest, event_broker: AsyncEventBroker
- ) -> AsyncDataStore:
- return cast(AsyncDataStore, request.param)
-
- @pytest.fixture
- async def datastore(
- self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker
- ) -> AsyncGenerator[AsyncDataStore, Any]:
- await raw_datastore.start(event_broker)
- yield raw_datastore
- await raw_datastore.stop()
-
- async def test_add_replace_task(self, datastore: AsyncDataStore) -> None:
- import math
-
- event_types = {TaskAdded, TaskUpdated}
- async with self.capture_events(datastore, 3, event_types) as events:
- await datastore.add_task(Task(id="test_task", func=print))
- await datastore.add_task(Task(id="test_task2", func=math.ceil))
- await datastore.add_task(Task(id="test_task", func=repr))
-
- tasks = await datastore.get_tasks()
- assert len(tasks) == 2
- assert tasks[0].id == "test_task"
- assert tasks[0].func is repr
- assert tasks[1].id == "test_task2"
- assert tasks[1].func is math.ceil
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "test_task"
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "test_task2"
-
- received_event = events.pop(0)
- assert isinstance(received_event, TaskUpdated)
- assert received_event.task_id == "test_task"
-
- assert not events
-
- async def test_add_schedules(
- self, datastore: AsyncDataStore, schedules: list[Schedule]
- ) -> None:
- async with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
- for schedule in schedules:
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- assert await datastore.get_schedules() == schedules
- assert await datastore.get_schedules({"s1", "s2", "s3"}) == schedules
- assert await datastore.get_schedules({"s1"}) == [schedules[0]]
- assert await datastore.get_schedules({"s2"}) == [schedules[1]]
- assert await datastore.get_schedules({"s3"}) == [schedules[2]]
-
- for event, schedule in zip(events, schedules):
- assert event.schedule_id == schedule.id
- assert event.next_fire_time == schedule.next_fire_time
-
- async def test_replace_schedules(
- self, datastore: AsyncDataStore, schedules: list[Schedule]
- ) -> None:
- async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
- for schedule in schedules:
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- next_fire_time = schedules[2].trigger.next()
- schedule = Schedule(
- id="s3",
- task_id="foo",
- trigger=schedules[2].trigger,
- args=(),
- kwargs={},
- coalesce=CoalescePolicy.earliest,
- misfire_grace_time=None,
- tags=frozenset(),
- )
- schedule.next_fire_time = next_fire_time
- await datastore.add_schedule(schedule, ConflictPolicy.replace)
-
- schedules = await datastore.get_schedules({schedule.id})
- assert schedules[0].task_id == "foo"
- assert schedules[0].next_fire_time == next_fire_time
- assert schedules[0].args == ()
- assert schedules[0].kwargs == {}
- assert schedules[0].coalesce is CoalescePolicy.earliest
- assert schedules[0].misfire_grace_time is None
- assert schedules[0].tags == frozenset()
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s3"
- assert received_event.next_fire_time == datetime(
- 2020, 9, 15, tzinfo=timezone.utc
- )
- assert not events
-
- async def test_remove_schedules(
- self, datastore: AsyncDataStore, schedules: list[Schedule]
- ) -> None:
- async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
- for schedule in schedules:
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- await datastore.remove_schedules(["s1", "s2"])
- assert await datastore.get_schedules() == [schedules[2]]
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s1"
-
- received_event = events.pop(0)
- assert received_event.schedule_id == "s2"
-
- assert not events
-
- @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
- async def test_acquire_release_schedules(
- self, datastore: AsyncDataStore, schedules: list[Schedule]
- ) -> None:
- event_types = {ScheduleRemoved, ScheduleUpdated}
- async with self.capture_events(datastore, 2, event_types) as events:
- for schedule in schedules:
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
-
- # The first scheduler gets the first due schedule
- schedules1 = await datastore.acquire_schedules("dummy-id1", 1)
- assert len(schedules1) == 1
- assert schedules1[0].id == "s1"
-
- # The second scheduler gets the second due schedule
- schedules2 = await datastore.acquire_schedules("dummy-id2", 1)
- assert len(schedules2) == 1
- assert schedules2[0].id == "s2"
-
- # The third scheduler gets nothing
- schedules3 = await datastore.acquire_schedules("dummy-id3", 1)
- assert not schedules3
-
- # Update the schedules and check that the job store actually deletes the
- # first one and updates the second one
- schedules1[0].next_fire_time = None
- schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
-
- # Release all the schedules
- await datastore.release_schedules("dummy-id1", schedules1)
- await datastore.release_schedules("dummy-id2", schedules2)
-
- # Check that the first schedule is gone
- schedules = await datastore.get_schedules()
- assert len(schedules) == 2
- assert schedules[0].id == "s2"
- assert schedules[1].id == "s3"
-
- # Check for the appropriate update and delete events
- received_event = events.pop(0)
- assert isinstance(received_event, ScheduleRemoved)
- assert received_event.schedule_id == "s1"
-
- received_event = events.pop(0)
- assert isinstance(received_event, ScheduleUpdated)
- assert received_event.schedule_id == "s2"
- assert received_event.next_fire_time == datetime(
- 2020, 9, 15, tzinfo=timezone.utc
- )
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+ await datastore.add_job(job)
- assert not events
+ acquired = await datastore.acquire_jobs("worker1", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
- async def test_release_schedule_two_identical_fire_times(
- self, datastore: AsyncDataStore
- ) -> None:
- """Regression test for #616."""
- for i in range(1, 3):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ await datastore.release_job(
+ "worker1",
+ acquired[0].task_id,
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
+
+
+async def test_acquire_jobs_lock_timeout(
+ datastore: DataStore, freezer: FrozenDateTimeFactory
+) -> None:
+ """
+ Test that a worker can acquire jobs that were acquired by another scheduler but
+ not released within the lock timeout period.
+
+ """
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async")
+ )
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+ await datastore.add_job(job)
- schedules = await datastore.acquire_schedules("foo", 3)
- schedules[0].next_fire_time = None
- await datastore.release_schedules("foo", schedules)
-
- remaining = await datastore.get_schedules({s.id for s in schedules})
- assert len(remaining) == 1
- assert remaining[0].id == schedules[1].id
-
- async def test_release_two_schedules_at_once(
- self, datastore: AsyncDataStore
- ) -> None:
- """Regression test for #621."""
- for i in range(2):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ # First, one worker acquires the first available job
+ acquired = await datastore.acquire_jobs("worker1", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
- schedules = await datastore.acquire_schedules("foo", 3)
- await datastore.release_schedules("foo", schedules)
-
- remaining = await datastore.get_schedules({s.id for s in schedules})
- assert len(remaining) == 2
-
- async def test_acquire_schedules_lock_timeout(
- self, datastore: AsyncDataStore, schedules: list[Schedule], freezer
- ) -> None:
- """
- Test that a scheduler can acquire schedules that were acquired by another
- scheduler but not released within the lock timeout period.
-
- """
- await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
-
- # First, one scheduler acquires the first available schedule
- acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
- assert len(acquired1) == 1
- assert acquired1[0].id == "s1"
-
- # Try to acquire the schedule just at the threshold (now == acquired_until).
- # This should not yield any schedules.
- freezer.tick(30)
- acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
- assert not acquired2
-
- # Right after that, the schedule should be available
- freezer.tick(1)
- acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
- assert len(acquired3) == 1
- assert acquired3[0].id == "s1"
-
- async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- jobs = [Job(task_id="task1") for _ in range(2)]
- for job in jobs:
- await datastore.add_job(job)
-
- # The first worker gets the first job in the queue
- jobs1 = await datastore.acquire_jobs("worker1", 1)
- assert len(jobs1) == 1
- assert jobs1[0].id == jobs[0].id
-
- # The second worker gets the second job
- jobs2 = await datastore.acquire_jobs("worker2", 1)
- assert len(jobs2) == 1
- assert jobs2[0].id == jobs[1].id
-
- # The third worker gets nothing
- jobs3 = await datastore.acquire_jobs("worker3", 1)
- assert not jobs3
-
- async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- await datastore.add_job(job)
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not await datastore.acquire_jobs("worker2", 1)
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.success,
- return_value="foo",
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.success
- assert result.exception is None
- assert result.return_value == "foo"
-
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
-
- async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- await datastore.add_job(job)
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = await datastore.acquire_jobs("worker2", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.error,
- exception=ValueError("foo"),
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.error
- assert isinstance(result.exception, ValueError)
- assert result.exception.args == ("foo",)
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
-
- async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- await datastore.add_job(job)
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult.from_job(
- acquired[0],
- JobOutcome.missed_start_deadline,
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.missed_start_deadline
- assert result.exception is None
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
-
- async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
+async def test_acquire_jobs_max_number_exceeded(datastore: DataStore) -> None:
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, executor="async", max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
+ for job in jobs:
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs("worker1", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ # Check that only 2 jobs are returned from acquire_jobs() even though the limit
+ # wqas 3
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
+
+ # Release one job, and the worker should be able to acquire the third job
+ await datastore.release_job(
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [jobs[2].id]
- await datastore.release_job(
- "worker1",
- acquired[0].task_id,
- JobResult.from_job(acquired[0], JobOutcome.cancelled),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.cancelled
- assert result.exception is None
- assert result.return_value is None
-
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
-
- async def test_acquire_jobs_lock_timeout(
- self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory
- ) -> None:
- """
- Test that a worker can acquire jobs that were acquired by another scheduler but
- not released within the lock timeout period.
-
- """
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
- await datastore.add_job(job)
- # First, one worker acquires the first available job
- acquired = await datastore.acquire_jobs("worker1", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- # Try to acquire the job just at the threshold (now == acquired_until).
- # This should not yield any jobs.
- freezer.tick(30)
- assert not await datastore.acquire_jobs("worker2", 1)
-
- # Right after that, the job should be available
- freezer.tick(1)
- acquired = await datastore.acquire_jobs("worker2", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- async def test_acquire_jobs_max_number_exceeded(
- self, datastore: AsyncDataStore
- ) -> None:
- await datastore.add_task(
- Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
- )
- jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
- for job in jobs:
- await datastore.add_job(job)
-
- # Check that only 2 jobs are returned from acquire_jobs() even though the limit
- # wqas 3
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
-
- # Release one job, and the worker should be able to acquire the third job
- await datastore.release_job(
- "worker1",
- acquired_jobs[0].task_id,
- JobResult.from_job(
- acquired_jobs[0],
- JobOutcome.success,
- return_value=None,
- ),
- )
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [jobs[2].id]
-
- async def test_add_get_task(self, datastore: DataStore) -> None:
- with pytest.raises(TaskLookupError):
- await datastore.get_task("dummyid")
-
- await datastore.add_task(Task(id="dummyid", func=asynccontextmanager))
- task = await datastore.get_task("dummyid")
- assert task.id == "dummyid"
- assert task.func is asynccontextmanager
-
- async def test_cancel_start(
- self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker
- ) -> None:
- with CancelScope() as scope:
- scope.cancel()
- await raw_datastore.start(event_broker)
- await raw_datastore.stop()
-
- async def test_cancel_stop(
- self, raw_datastore: AsyncDataStore, event_broker: AsyncEventBroker
- ) -> None:
- with CancelScope() as scope:
- await raw_datastore.start(event_broker)
+async def test_add_get_task(datastore: DataStore) -> None:
+ with pytest.raises(TaskLookupError):
+ await datastore.get_task("dummyid")
+
+ await datastore.add_task(
+ Task(id="dummyid", func=asynccontextmanager, executor="async")
+ )
+ task = await datastore.get_task("dummyid")
+ assert task.id == "dummyid"
+ assert task.func is asynccontextmanager
+
+
+async def test_cancel_start(
+ raw_datastore: DataStore, local_broker: EventBroker
+) -> None:
+ with CancelScope() as scope:
+ scope.cancel()
+ async with AsyncExitStack() as exit_stack:
+ await raw_datastore.start(exit_stack, local_broker)
+
+
+async def test_cancel_stop(raw_datastore: DataStore, local_broker: EventBroker) -> None:
+ with CancelScope() as scope:
+ async with AsyncExitStack() as exit_stack:
+ await raw_datastore.start(exit_stack, local_broker)
scope.cancel()
- await raw_datastore.stop()
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index 09942f5..9b98f60 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -1,286 +1,107 @@
from __future__ import annotations
-from collections.abc import AsyncGenerator, Generator
-from concurrent.futures import Future
+from contextlib import AsyncExitStack
from datetime import datetime, timezone
-from queue import Empty, Queue
-from typing import Any, cast
import pytest
-from _pytest.fixtures import SubRequest
from _pytest.logging import LogCaptureFixture
from anyio import CancelScope, create_memory_object_stream, fail_after
-from pytest_lazyfixture import lazy_fixture
from apscheduler import Event, ScheduleAdded
-from apscheduler.abc import AsyncEventBroker, EventBroker, Serializer
+from apscheduler.abc import EventBroker
+pytestmark = pytest.mark.anyio
-@pytest.fixture
-def local_broker() -> EventBroker:
- from apscheduler.eventbrokers.local import LocalEventBroker
- return LocalEventBroker()
-
-
-@pytest.fixture
-def local_async_broker() -> AsyncEventBroker:
- from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
-
- return LocalAsyncEventBroker()
-
-
-@pytest.fixture
-def redis_broker(serializer: Serializer) -> EventBroker:
- from apscheduler.eventbrokers.redis import RedisEventBroker
-
- broker = RedisEventBroker.from_url(
- "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05
+async def test_publish_subscribe(event_broker: EventBroker) -> None:
+ send, receive = create_memory_object_stream(2)
+ event_broker.subscribe(send.send)
+ event_broker.subscribe(send.send_nowait)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
)
- return broker
-
-
-@pytest.fixture
-async def async_redis_broker(serializer: Serializer) -> AsyncEventBroker:
- from apscheduler.eventbrokers.async_redis import AsyncRedisEventBroker
-
- broker = AsyncRedisEventBroker.from_url(
- "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05
+ await event_broker.publish(event)
+
+ with fail_after(3):
+ event1 = await receive.receive()
+ event2 = await receive.receive()
+
+ assert event1 == event2
+ assert isinstance(event1, ScheduleAdded)
+ assert isinstance(event1.timestamp, datetime)
+ assert event1.schedule_id == "schedule1"
+ assert event1.next_fire_time == datetime(
+ 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
)
- return broker
-@pytest.fixture
-def mqtt_broker(serializer: Serializer) -> EventBroker:
- from paho.mqtt.client import Client
-
- from apscheduler.eventbrokers.mqtt import MQTTEventBroker
-
- return MQTTEventBroker(Client(), serializer=serializer)
-
-
-@pytest.fixture
-async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
- from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
-
- broker = AsyncpgEventBroker.from_dsn(
- "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer
+async def test_subscribe_one_shot(event_broker: EventBroker) -> None:
+ send, receive = create_memory_object_stream(2)
+ event_broker.subscribe(send.send, one_shot=True)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
)
- yield broker
-
-
-@pytest.fixture(
- params=[
- pytest.param(lazy_fixture("local_broker"), id="local"),
- pytest.param(
- lazy_fixture("redis_broker"),
- id="redis",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
- ),
- ]
-)
-def broker(request: SubRequest) -> Generator[EventBroker, Any, None]:
- request.param.start()
- yield request.param
- request.param.stop()
-
-
-@pytest.fixture(
- params=[
- pytest.param(lazy_fixture("local_async_broker"), id="local"),
- pytest.param(
- lazy_fixture("asyncpg_broker"),
- id="asyncpg",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("async_redis_broker"),
- id="async_redis",
- marks=[pytest.mark.external_service],
- ),
- ]
-)
-async def raw_async_broker(request: SubRequest) -> AsyncEventBroker:
- return cast(AsyncEventBroker, request.param)
-
-
-@pytest.fixture
-async def async_broker(
- raw_async_broker: AsyncEventBroker,
-) -> AsyncGenerator[AsyncEventBroker, Any]:
- await raw_async_broker.start()
- yield raw_async_broker
- await raw_async_broker.stop()
-
-
-class TestEventBroker:
- def test_publish_subscribe(self, broker: EventBroker) -> None:
- queue: Queue[Event] = Queue()
- broker.subscribe(queue.put_nowait)
- broker.subscribe(queue.put_nowait)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- broker.publish(event)
- event1 = queue.get(timeout=3)
- event2 = queue.get(timeout=1)
-
- assert event1 == event2
- assert isinstance(event1, ScheduleAdded)
- assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == "schedule1"
- assert event1.next_fire_time == datetime(
- 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
- )
-
- def test_subscribe_one_shot(self, broker: EventBroker) -> None:
- queue: Queue[Event] = Queue()
- broker.subscribe(queue.put_nowait, one_shot=True)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- broker.publish(event)
- event = ScheduleAdded(
- schedule_id="schedule2",
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
- )
- broker.publish(event)
- received_event = queue.get(timeout=3)
- with pytest.raises(Empty):
- queue.get(timeout=0.1)
-
- assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == "schedule1"
-
- def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
- queue: Queue[Event] = Queue()
- subscription = broker.subscribe(queue.put_nowait)
- broker.publish(Event())
- queue.get(timeout=3)
-
- subscription.unsubscribe()
- broker.publish(Event())
- with pytest.raises(Empty):
- queue.get(timeout=0.1)
-
- def test_publish_no_subscribers(
- self, broker: EventBroker, caplog: LogCaptureFixture
- ) -> None:
- broker.publish(Event())
- assert not caplog.text
-
- def test_publish_exception(
- self, broker: EventBroker, caplog: LogCaptureFixture
- ) -> None:
- def bad_subscriber(event: Event) -> None:
- raise Exception("foo")
-
- timestamp = datetime.now(timezone.utc)
- event_future: Future[Event] = Future()
- broker.subscribe(bad_subscriber)
- broker.subscribe(event_future.set_result)
- broker.publish(Event(timestamp=timestamp))
-
- event = event_future.result(3)
- assert isinstance(event, Event)
- assert event.timestamp == timestamp
- assert "Error delivering Event" in caplog.text
+ await event_broker.publish(event)
+ event = ScheduleAdded(
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
+ await event_broker.publish(event)
+ with fail_after(3):
+ received_event = await receive.receive()
-@pytest.mark.anyio
-class TestAsyncEventBroker:
- async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None:
- send, receive = create_memory_object_stream(2)
- async_broker.subscribe(send.send)
- async_broker.subscribe(send.send_nowait)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- await async_broker.publish(event)
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
- with fail_after(3):
- event1 = await receive.receive()
- event2 = await receive.receive()
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == "schedule1"
- assert event1 == event2
- assert isinstance(event1, ScheduleAdded)
- assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == "schedule1"
- assert event1.next_fire_time == datetime(
- 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
- )
- async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None:
- send, receive = create_memory_object_stream(2)
- async_broker.subscribe(send.send, one_shot=True)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- await async_broker.publish(event)
- event = ScheduleAdded(
- schedule_id="schedule2",
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
- )
- await async_broker.publish(event)
+async def test_unsubscribe(event_broker: EventBroker) -> None:
+ send, receive = create_memory_object_stream()
+ subscription = event_broker.subscribe(send.send)
+ await event_broker.publish(Event())
+ with fail_after(3):
+ await receive.receive()
- with fail_after(3):
- received_event = await receive.receive()
+ subscription.unsubscribe()
+ await event_broker.publish(Event())
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
- with pytest.raises(TimeoutError), fail_after(0.1):
- await receive.receive()
- assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == "schedule1"
+async def test_publish_no_subscribers(event_broker, caplog: LogCaptureFixture) -> None:
+ await event_broker.publish(Event())
+ assert not caplog.text
- async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
- send, receive = create_memory_object_stream()
- subscription = async_broker.subscribe(send.send)
- await async_broker.publish(Event())
- with fail_after(3):
- await receive.receive()
- subscription.unsubscribe()
- await async_broker.publish(Event())
- with pytest.raises(TimeoutError), fail_after(0.1):
- await receive.receive()
+async def test_publish_exception(event_broker, caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception("foo")
- async def test_publish_no_subscribers(
- self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
- ) -> None:
- await async_broker.publish(Event())
- assert not caplog.text
+ timestamp = datetime.now(timezone.utc)
+ send, receive = create_memory_object_stream()
+ event_broker.subscribe(bad_subscriber)
+ event_broker.subscribe(send.send)
+ await event_broker.publish(Event(timestamp=timestamp))
- async def test_publish_exception(
- self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
- ) -> None:
- def bad_subscriber(event: Event) -> None:
- raise Exception("foo")
+ received_event = await receive.receive()
+ assert received_event.timestamp == timestamp
+ assert "Error delivering Event" in caplog.text
- timestamp = datetime.now(timezone.utc)
- send, receive = create_memory_object_stream()
- async_broker.subscribe(bad_subscriber)
- async_broker.subscribe(send.send)
- await async_broker.publish(Event(timestamp=timestamp))
- received_event = await receive.receive()
- assert received_event.timestamp == timestamp
- assert "Error delivering Event" in caplog.text
+async def test_cancel_start(raw_event_broker: EventBroker) -> None:
+ with CancelScope() as scope:
+ scope.cancel()
+ async with AsyncExitStack() as exit_stack:
+ await raw_event_broker.start(exit_stack)
- async def test_cancel_start(self, raw_async_broker: AsyncEventBroker) -> None:
- with CancelScope() as scope:
- scope.cancel()
- await raw_async_broker.start()
- await raw_async_broker.stop()
- async def test_cancel_stop(self, raw_async_broker: AsyncEventBroker) -> None:
- with CancelScope() as scope:
- await raw_async_broker.start()
+async def test_cancel_stop(raw_event_broker: EventBroker) -> None:
+ with CancelScope() as scope:
+ async with AsyncExitStack() as exit_stack:
+ await raw_event_broker.start(exit_stack)
scope.cancel()
- await raw_async_broker.stop()
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32c1c29..30c8dc9 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -26,15 +26,14 @@ from apscheduler import (
SchedulerStopped,
Task,
TaskAdded,
+ current_async_scheduler,
current_job,
current_scheduler,
- current_worker,
)
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
-from apscheduler.workers.async_ import AsyncWorker
if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
@@ -70,7 +69,7 @@ class TestAsyncScheduler:
received_events: list[Event] = []
event = anyio.Event()
trigger = DateTrigger(datetime.now(timezone.utc))
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
scheduler.event_broker.subscribe(listener)
await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
await scheduler.start_in_background()
@@ -111,7 +110,7 @@ class TestAsyncScheduler:
assert not received_events
async def test_add_get_schedule(self) -> None:
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
with pytest.raises(ScheduleLookupError):
await scheduler.get_schedule("dummyid")
@@ -121,7 +120,7 @@ class TestAsyncScheduler:
assert isinstance(schedule, Schedule)
async def test_add_get_schedules(self) -> None:
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
assert await scheduler.get_schedules() == []
schedule1_id = await scheduler.add_schedule(
@@ -161,7 +160,7 @@ class TestAsyncScheduler:
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
- async with AsyncScheduler(start_worker=False) as scheduler:
+ async with AsyncScheduler(process_jobs=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = anyio.Event()
scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
@@ -263,8 +262,7 @@ class TestAsyncScheduler:
async def test_contextvars(self) -> None:
def check_contextvars() -> None:
- assert current_scheduler.get() is scheduler
- assert isinstance(current_worker.get(), AsyncWorker)
+ assert current_async_scheduler.get() is scheduler
info = current_job.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
@@ -277,7 +275,7 @@ class TestAsyncScheduler:
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
async with AsyncScheduler() as scheduler:
await scheduler.data_store.add_task(
- Task(id="task_id", func=check_contextvars)
+ Task(id="task_id", func=check_contextvars, executor="async")
)
job = Job(
task_id="task_id",
@@ -300,10 +298,7 @@ class TestAsyncScheduler:
async def test_wait_until_stopped(self) -> None:
async with AsyncScheduler() as scheduler:
- trigger = DateTrigger(
- datetime.now(timezone.utc) + timedelta(milliseconds=100)
- )
- await scheduler.add_schedule(scheduler.stop, trigger)
+ await scheduler.add_job(scheduler.stop)
await scheduler.wait_until_stopped()
# This should be a no-op
@@ -422,7 +417,7 @@ class TestSyncScheduler:
# Check that the job was created with the proper amount of jitter in its
# scheduled time
- jobs = scheduler.data_store.get_jobs({job_id})
+ jobs = scheduler._portal.call(scheduler.data_store.get_jobs, {job_id})
assert jobs[0].jitter == timedelta(seconds=jitter)
assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(
seconds=jitter
@@ -495,7 +490,6 @@ class TestSyncScheduler:
def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
- assert current_worker.get() is not None
info = current_job.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
@@ -507,7 +501,10 @@ class TestSyncScheduler:
scheduled_fire_time = datetime.now(timezone.utc)
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
with Scheduler() as scheduler:
- scheduler.data_store.add_task(Task(id="task_id", func=check_contextvars))
+ scheduler._portal.call(
+ scheduler.data_store.add_task,
+ Task(id="task_id", func=check_contextvars, executor="threadpool"),
+ )
job = Job(
task_id="task_id",
schedule_id="foo",
@@ -517,7 +514,7 @@ class TestSyncScheduler:
tags={"foo", "bar"},
result_expiration_time=timedelta(seconds=10),
)
- scheduler.data_store.add_job(job)
+ scheduler._portal.call(scheduler.data_store.add_job, job)
scheduler.start_in_background()
result = scheduler.get_job_result(job.id)
if result.outcome is JobOutcome.error:
@@ -527,10 +524,7 @@ class TestSyncScheduler:
def test_wait_until_stopped(self) -> None:
with Scheduler() as scheduler:
- trigger = DateTrigger(
- datetime.now(timezone.utc) + timedelta(milliseconds=100)
- )
- scheduler.add_schedule(scheduler.stop, trigger)
+ scheduler.add_job(scheduler.stop)
scheduler.start_in_background()
scheduler.wait_until_stopped()
diff --git a/tests/test_workers.py b/tests/test_workers.py
deleted file mode 100644
index aecc63b..0000000
--- a/tests/test_workers.py
+++ /dev/null
@@ -1,281 +0,0 @@
-from __future__ import annotations
-
-import threading
-from datetime import datetime, timezone
-from typing import Callable
-
-import anyio
-import pytest
-from anyio import fail_after
-
-from apscheduler import (
- Event,
- Job,
- JobAcquired,
- JobAdded,
- JobOutcome,
- JobReleased,
- Task,
- TaskAdded,
- WorkerStopped,
-)
-from apscheduler.datastores.memory import MemoryDataStore
-from apscheduler.workers.async_ import AsyncWorker
-from apscheduler.workers.sync import Worker
-
-pytestmark = pytest.mark.anyio
-
-
-def sync_func(*args, fail: bool, **kwargs):
- if fail:
- raise Exception("failing as requested")
- else:
- return args, kwargs
-
-
-async def async_func(*args, fail: bool, **kwargs):
- if fail:
- raise Exception("failing as requested")
- else:
- return args, kwargs
-
-
-def fail_func():
- pytest.fail("This function should never be run")
-
-
-class TestAsyncWorker:
- @pytest.mark.parametrize(
- "target_func", [sync_func, async_func], ids=["sync", "async"]
- )
- @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"])
- async def test_run_job_nonscheduled_success(
- self, target_func: Callable, fail: bool
- ) -> None:
- def listener(received_event: Event):
- received_events.append(received_event)
- if isinstance(received_event, JobReleased):
- event.set()
-
- received_events: list[Event] = []
- event = anyio.Event()
- async with AsyncWorker(MemoryDataStore()) as worker:
- worker.event_broker.subscribe(listener)
- await worker.data_store.add_task(Task(id="task_id", func=target_func))
- job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
- await worker.data_store.add_job(job)
- with fail_after(3):
- await event.wait()
-
- # First, a task was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "task_id"
-
- # Then a job was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAdded)
- assert received_event.job_id == job.id
- assert received_event.task_id == "task_id"
- assert received_event.schedule_id is None
-
- # Then the job was started
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAcquired)
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- received_event = received_events.pop(0)
- if fail:
- # Then the job failed
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.error
- assert received_event.exception_type == "Exception"
- assert received_event.exception_message == "failing as requested"
- assert isinstance(received_event.exception_traceback, list)
- assert all(
- isinstance(line, str) for line in received_event.exception_traceback
- )
- else:
- # Then the job finished successfully
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.success
- assert received_event.exception_type is None
- assert received_event.exception_message is None
- assert received_event.exception_traceback is None
-
- # Finally, the worker was stopped
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStopped)
-
- # There should be no more events on the list
- assert not received_events
-
- async def test_run_deadline_missed(self) -> None:
- def listener(received_event: Event):
- received_events.append(received_event)
- if isinstance(received_event, JobReleased):
- event.set()
-
- scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
- received_events: list[Event] = []
- event = anyio.Event()
- async with AsyncWorker(MemoryDataStore()) as worker:
- worker.event_broker.subscribe(listener)
- await worker.data_store.add_task(Task(id="task_id", func=fail_func))
- job = Job(
- task_id="task_id",
- schedule_id="foo",
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc),
- )
- await worker.data_store.add_job(job)
- with fail_after(3):
- await event.wait()
-
- # First, a task was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "task_id"
-
- # Then a job was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAdded)
- assert received_event.job_id == job.id
- assert received_event.task_id == "task_id"
- assert received_event.schedule_id == "foo"
-
- # The worker acquired the job
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAcquired)
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- # The worker determined that the deadline has been missed
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.missed_start_deadline
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- # Finally, the worker was stopped
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStopped)
-
- # There should be no more events on the list
- assert not received_events
-
-
-class TestSyncWorker:
- @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"])
- def test_run_job_nonscheduled(self, fail: bool) -> None:
- def listener(received_event: Event):
- received_events.append(received_event)
- if isinstance(received_event, JobReleased):
- event.set()
-
- received_events: list[Event] = []
- event = threading.Event()
- with Worker(MemoryDataStore()) as worker:
- worker.event_broker.subscribe(listener)
- worker.data_store.add_task(Task(id="task_id", func=sync_func))
- job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
- worker.data_store.add_job(job)
- event.wait(3)
-
- # First, a task was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "task_id"
-
- # Then a job was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAdded)
- assert received_event.job_id == job.id
- assert received_event.task_id == "task_id"
- assert received_event.schedule_id is None
-
- # Then the job was started
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAcquired)
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- received_event = received_events.pop(0)
- if fail:
- # Then the job failed
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.error
- assert received_event.exception_type == "Exception"
- assert received_event.exception_message == "failing as requested"
- assert isinstance(received_event.exception_traceback, list)
- assert all(
- isinstance(line, str) for line in received_event.exception_traceback
- )
- else:
- # Then the job finished successfully
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.success
- assert received_event.exception_type is None
- assert received_event.exception_message is None
- assert received_event.exception_traceback is None
-
- # Finally, the worker was stopped
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStopped)
-
- # There should be no more events on the list
- assert not received_events
-
- def test_run_deadline_missed(self) -> None:
- def listener(received_event: Event):
- received_events.append(received_event)
- if isinstance(received_event, JobReleased):
- event.set()
-
- scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
- received_events: list[Event] = []
- event = threading.Event()
- with Worker(MemoryDataStore()) as worker:
- worker.event_broker.subscribe(listener)
- worker.data_store.add_task(Task(id="task_id", func=fail_func))
- job = Job(
- task_id="task_id",
- schedule_id="foo",
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc),
- )
- worker.data_store.add_job(job)
- event.wait(3)
-
- # First, a task was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == "task_id"
-
- # Then a job was added
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAdded)
- assert received_event.job_id == job.id
- assert received_event.task_id == "task_id"
- assert received_event.schedule_id == "foo"
-
- # The worker acquired the job
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobAcquired)
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- # The worker determined that the deadline has been missed
- received_event = received_events.pop(0)
- assert isinstance(received_event, JobReleased)
- assert received_event.outcome is JobOutcome.missed_start_deadline
- assert received_event.job_id == job.id
- assert received_event.worker_id == worker.identity
-
- # Finally, the worker was stopped
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStopped)
-
- # There should be no more events on the list
- assert not received_events