summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-06-09 12:40:55 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-07-19 00:48:51 +0300
commitf215c1ab45959095f6b499eb7b26356c5937ee8b (patch)
tree552687a0ed6e799b3da96eec5cd3fbb14d19f1b5 /tests
parente3158fdf59a7c92a9449a566a2b746a4024e582f (diff)
downloadapscheduler-f215c1ab45959095f6b499eb7b26356c5937ee8b.tar.gz
Added support for starting the sync scheduler (and worker) without the context manager
Diffstat (limited to 'tests')
-rw-r--r--tests/test_datastores.py947
-rw-r--r--tests/test_eventbrokers.py189
-rw-r--r--tests/test_schedulers.py40
-rw-r--r--tests/test_workers.py63
4 files changed, 820 insertions, 419 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index d6376ea..ea5416e 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -1,19 +1,31 @@
from __future__ import annotations
-from contextlib import asynccontextmanager
+import threading
+from collections.abc import Generator
+from contextlib import asynccontextmanager, contextmanager
from datetime import datetime, timezone
from tempfile import TemporaryDirectory
-from typing import AsyncGenerator
+from typing import Any, AsyncGenerator, cast
import anyio
import pytest
+from _pytest.fixtures import SubRequest
from freezegun.api import FrozenDateTimeFactory
from pytest_lazyfixture import lazy_fixture
-from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule
+from apscheduler.abc import (
+ AsyncDataStore,
+ AsyncEventBroker,
+ DataStore,
+ EventBroker,
+ Job,
+ Schedule,
+)
from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome
+from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
+from apscheduler.eventbrokers.local import LocalEventBroker
from apscheduler.events import (
Event,
ScheduleAdded,
@@ -32,6 +44,12 @@ def memory_store() -> DataStore:
@pytest.fixture
+def adapted_memory_store() -> AsyncDataStore:
+ store = MemoryDataStore()
+ return AsyncDataStoreAdapter(store)
+
+
+@pytest.fixture
def mongodb_store() -> DataStore:
from pymongo import MongoClient
@@ -96,39 +114,6 @@ async def asyncpg_store() -> AsyncDataStore:
await engine.dispose()
-@pytest.fixture(
- params=[
- pytest.param(lazy_fixture("memory_store"), id="memory"),
- pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
- pytest.param(
- lazy_fixture("mongodb_store"),
- id="mongodb",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("psycopg2_store"),
- id="psycopg2",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("asyncpg_store"),
- id="asyncpg",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("mysql_store"),
- id="mysql",
- marks=[pytest.mark.external_service],
- ),
- ]
-)
-async def datastore(request):
- if isinstance(request.param, DataStore):
- return AsyncDataStoreAdapter(request.param)
- else:
- return request.param
-
-
@pytest.fixture
def schedules() -> list[Schedule]:
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
@@ -144,32 +129,504 @@ def schedules() -> list[Schedule]:
return [schedule1, schedule2, schedule3]
-@asynccontextmanager
-async def capture_events(
- datastore: AsyncDataStore, limit: int, event_types: set[type[Event]] | None = None
-) -> AsyncGenerator[list[Event], None]:
- def listener(event: Event) -> None:
- events.append(event)
- if len(events) == limit:
- limit_event.set()
- subscription.unsubscribe()
+class TestDataStores:
+ @contextmanager
+ def capture_events(
+ self,
+ datastore: DataStore,
+ limit: int,
+ event_types: set[type[Event]] | None = None,
+ ) -> Generator[list[Event], None, None]:
+ def listener(event: Event) -> None:
+ events.append(event)
+ if len(events) == limit:
+ limit_event.set()
+ subscription.unsubscribe()
+
+ events: list[Event] = []
+ limit_event = threading.Event()
+ subscription = datastore.events.subscribe(listener, event_types)
+ yield events
+ if limit:
+ limit_event.wait(2)
+
+ @pytest.fixture
+ def event_broker(self) -> Generator[EventBroker, Any, None]:
+ broker = LocalEventBroker()
+ broker.start()
+ yield broker
+ broker.stop()
+
+ @pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("memory_store"), id="memory"),
+ pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
+ pytest.param(
+ lazy_fixture("mongodb_store"),
+ id="mongodb",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("psycopg2_store"),
+ id="psycopg2",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mysql_store"),
+ id="mysql",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+ )
+ def datastore(
+ self, request: SubRequest, event_broker: EventBroker
+ ) -> Generator[DataStore, Any, None]:
+ datastore = cast(DataStore, request.param)
+ datastore.start(event_broker)
+ yield datastore
+ datastore.stop()
+
+ def test_add_replace_task(self, datastore: DataStore) -> None:
+ import math
- events: list[Event] = []
- limit_event = anyio.Event()
- subscription = datastore.events.subscribe(listener, event_types)
- yield events
- if limit:
- with anyio.fail_after(3):
- await limit_event.wait()
+ event_types = {TaskAdded, TaskUpdated}
+ with self.capture_events(datastore, 3, event_types) as events:
+ datastore.add_task(Task(id="test_task", func=print))
+ datastore.add_task(Task(id="test_task2", func=math.ceil))
+ datastore.add_task(Task(id="test_task", func=repr))
+
+ tasks = datastore.get_tasks()
+ assert len(tasks) == 2
+ assert tasks[0].id == "test_task"
+ assert tasks[0].func is repr
+ assert tasks[1].id == "test_task2"
+ assert tasks[1].func is math.ceil
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task2"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskUpdated)
+ assert received_event.task_id == "test_task"
+
+ assert not events
+
+ def test_add_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ assert datastore.get_schedules() == schedules
+ assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules
+ assert datastore.get_schedules({"s1"}) == [schedules[0]]
+ assert datastore.get_schedules({"s2"}) == [schedules[1]]
+ assert datastore.get_schedules({"s3"}) == [schedules[2]]
+
+ for event, schedule in zip(events, schedules):
+ assert event.schedule_id == schedule.id
+ assert event.next_fire_time == schedule.next_fire_time
+
+ def test_replace_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ next_fire_time = schedules[2].trigger.next()
+ schedule = Schedule(
+ id="s3",
+ task_id="foo",
+ trigger=schedules[2].trigger,
+ args=(),
+ kwargs={},
+ coalesce=CoalescePolicy.earliest,
+ misfire_grace_time=None,
+ tags=frozenset(),
+ )
+ schedule.next_fire_time = next_fire_time
+ datastore.add_schedule(schedule, ConflictPolicy.replace)
+
+ schedules = datastore.get_schedules({schedule.id})
+ assert schedules[0].task_id == "foo"
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].args == ()
+ assert schedules[0].kwargs == {}
+ assert schedules[0].coalesce is CoalescePolicy.earliest
+ assert schedules[0].misfire_grace_time is None
+ assert schedules[0].tags == frozenset()
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s3"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
+ assert not events
+
+ def test_remove_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ datastore.remove_schedules(["s1", "s2"])
+ assert datastore.get_schedules() == [schedules[2]]
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s1"
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s2"
+
+ assert not events
+
+ @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
+ def test_acquire_release_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ event_types = {ScheduleRemoved, ScheduleUpdated}
+ with self.capture_events(datastore, 2, event_types) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ # The first scheduler gets the first due schedule
+ schedules1 = datastore.acquire_schedules("dummy-id1", 1)
+ assert len(schedules1) == 1
+ assert schedules1[0].id == "s1"
+
+ # The second scheduler gets the second due schedule
+ schedules2 = datastore.acquire_schedules("dummy-id2", 1)
+ assert len(schedules2) == 1
+ assert schedules2[0].id == "s2"
+
+ # The third scheduler gets nothing
+ schedules3 = datastore.acquire_schedules("dummy-id3", 1)
+ assert not schedules3
+
+ # Update the schedules and check that the job store actually deletes the first
+ # one and updates the second one
+ schedules1[0].next_fire_time = None
+ schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ # Release all the schedules
+ datastore.release_schedules("dummy-id1", schedules1)
+ datastore.release_schedules("dummy-id2", schedules2)
+
+ # Check that the first schedule is gone
+ schedules = datastore.get_schedules()
+ assert len(schedules) == 2
+ assert schedules[0].id == "s2"
+ assert schedules[1].id == "s3"
+
+ # Check for the appropriate update and delete events
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == "s1"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleUpdated)
+ assert received_event.schedule_id == "s2"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
+
+ assert not events
+
+ def test_release_schedule_two_identical_fire_times(
+ self, datastore: DataStore
+ ) -> None:
+ """Regression test for #616."""
+ for i in range(1, 3):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = datastore.acquire_schedules("foo", 3)
+ schedules[0].next_fire_time = None
+ datastore.release_schedules("foo", schedules)
+
+ remaining = datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 1
+ assert remaining[0].id == schedules[1].id
+
+ def test_release_two_schedules_at_once(self, datastore: DataStore) -> None:
+ """Regression test for #621."""
+ for i in range(2):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = datastore.acquire_schedules("foo", 3)
+ datastore.release_schedules("foo", schedules)
+
+ remaining = datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 2
+
+ def test_acquire_schedules_lock_timeout(
+ self, datastore: DataStore, schedules: list[Schedule], freezer
+ ) -> None:
+ """
+ Test that a scheduler can acquire schedules that were acquired by another scheduler but
+ not released within the lock timeout period.
+
+ """
+ datastore.add_schedule(schedules[0], ConflictPolicy.exception)
+
+ # First, one scheduler acquires the first available schedule
+ acquired1 = datastore.acquire_schedules("dummy-id1", 1)
+ assert len(acquired1) == 1
+ assert acquired1[0].id == "s1"
+
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ acquired2 = datastore.acquire_schedules("dummy-id2", 1)
+ assert not acquired2
+
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired3 = datastore.acquire_schedules("dummy-id2", 1)
+ assert len(acquired3) == 1
+ assert acquired3[0].id == "s1"
+
+ def test_acquire_multiple_workers(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ jobs = [Job(task_id="task1") for _ in range(2)]
+ for job in jobs:
+ datastore.add_job(job)
+
+ # The first worker gets the first job in the queue
+ jobs1 = datastore.acquire_jobs("worker1", 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
+
+ # The second worker gets the second job
+ jobs2 = datastore.acquire_jobs("worker2", 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
+
+ # The third worker gets nothing
+ jobs3 = datastore.acquire_jobs("worker3", 1)
+ assert not jobs3
+
+ def test_job_release_success(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.success,
+ return_value="foo",
+ ),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == "foo"
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_failure(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ("foo",)
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_missed_deadline(self, datastore: DataStore):
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_cancelled(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker1", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker1",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_acquire_jobs_lock_timeout(
+ self, datastore: DataStore, freezer: FrozenDateTimeFactory
+ ) -> None:
+ """
+ Test that a worker can acquire jobs that were acquired by another scheduler but not
+ released within the lock timeout period.
+
+ """
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ # First, one worker acquires the first available job
+ acquired = datastore.acquire_jobs("worker1", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not datastore.acquire_jobs("worker2", 1)
+
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = datastore.acquire_jobs("worker2", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None:
+ datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
+ for job in jobs:
+ datastore.add_job(job)
+
+ # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
+ acquired_jobs = datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
+
+ # Release one job, and the worker should be able to acquire the third job
+ datastore.release_job(
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult(
+ job_id=acquired_jobs[0].id,
+ outcome=JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [jobs[2].id]
@pytest.mark.anyio
-class TestAsyncStores:
+class TestAsyncDataStores:
+ @asynccontextmanager
+ async def capture_events(
+ self,
+ datastore: AsyncDataStore,
+ limit: int,
+ event_types: set[type[Event]] | None = None,
+ ) -> AsyncGenerator[list[Event], None]:
+ def listener(event: Event) -> None:
+ events.append(event)
+ if len(events) == limit:
+ limit_event.set()
+ subscription.unsubscribe()
+
+ events: list[Event] = []
+ limit_event = anyio.Event()
+ subscription = datastore.events.subscribe(listener, event_types)
+ yield events
+ if limit:
+ with anyio.fail_after(3):
+ await limit_event.wait()
+
+ @pytest.fixture
+ async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]:
+ broker = LocalAsyncEventBroker()
+ await broker.start()
+ yield broker
+ await broker.stop()
+
+ @pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("adapted_memory_store"), id="memory"),
+ pytest.param(
+ lazy_fixture("asyncpg_store"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+ )
+ async def datastore(
+ self, request: SubRequest, event_broker: AsyncEventBroker
+ ) -> AsyncGenerator[AsyncDataStore, Any]:
+ datastore = cast(AsyncDataStore, request.param)
+ await datastore.start(event_broker)
+ yield datastore
+ await datastore.stop()
+
async def test_add_replace_task(self, datastore: AsyncDataStore) -> None:
import math
event_types = {TaskAdded, TaskUpdated}
- async with datastore, capture_events(datastore, 3, event_types) as events:
+ async with self.capture_events(datastore, 3, event_types) as events:
await datastore.add_task(Task(id="test_task", func=print))
await datastore.add_task(Task(id="test_task2", func=math.ceil))
await datastore.add_task(Task(id="test_task", func=repr))
@@ -198,7 +655,7 @@ class TestAsyncStores:
async def test_add_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events:
+ async with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -215,7 +672,7 @@ class TestAsyncStores:
async def test_replace_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events:
+ async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -252,7 +709,7 @@ class TestAsyncStores:
async def test_remove_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events:
+ async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -272,7 +729,7 @@ class TestAsyncStores:
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
event_types = {ScheduleRemoved, ScheduleUpdated}
- async with datastore, capture_events(datastore, 2, event_types) as events:
+ async with self.capture_events(datastore, 2, event_types) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -323,16 +780,15 @@ class TestAsyncStores:
self, datastore: AsyncDataStore
) -> None:
"""Regression test for #616."""
- async with datastore:
- for i in range(1, 3):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ for i in range(1, 3):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- schedules = await datastore.acquire_schedules("foo", 3)
- schedules[0].next_fire_time = None
- await datastore.release_schedules("foo", schedules)
+ schedules = await datastore.acquire_schedules("foo", 3)
+ schedules[0].next_fire_time = None
+ await datastore.release_schedules("foo", schedules)
remaining = await datastore.get_schedules({s.id for s in schedules})
assert len(remaining) == 1
@@ -342,15 +798,14 @@ class TestAsyncStores:
self, datastore: AsyncDataStore
) -> None:
"""Regression test for #621."""
- async with datastore:
- for i in range(2):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ for i in range(2):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- schedules = await datastore.acquire_schedules("foo", 3)
- await datastore.release_schedules("foo", schedules)
+ schedules = await datastore.acquire_schedules("foo", 3)
+ await datastore.release_schedules("foo", schedules)
remaining = await datastore.get_schedules({s.id for s in schedules})
assert len(remaining) == 2
@@ -363,153 +818,145 @@ class TestAsyncStores:
not released within the lock timeout period.
"""
- async with datastore:
- await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
-
- # First, one scheduler acquires the first available schedule
- acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
- assert len(acquired1) == 1
- assert acquired1[0].id == "s1"
-
- # Try to acquire the schedule just at the threshold (now == acquired_until).
- # This should not yield any schedules.
- freezer.tick(30)
- acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
- assert not acquired2
-
- # Right after that, the schedule should be available
- freezer.tick(1)
- acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
- assert len(acquired3) == 1
- assert acquired3[0].id == "s1"
+ await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
- async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- jobs = [Job(task_id="task1") for _ in range(2)]
- for job in jobs:
- await datastore.add_job(job)
-
- # The first worker gets the first job in the queue
- jobs1 = await datastore.acquire_jobs("worker1", 1)
- assert len(jobs1) == 1
- assert jobs1[0].id == jobs[0].id
-
- # The second worker gets the second job
- jobs2 = await datastore.acquire_jobs("worker2", 1)
- assert len(jobs2) == 1
- assert jobs2[0].id == jobs[1].id
-
- # The third worker gets nothing
- jobs3 = await datastore.acquire_jobs("worker3", 1)
- assert not jobs3
-
- async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ # First, one scheduler acquires the first available schedule
+ acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
+ assert len(acquired1) == 1
+ assert acquired1[0].id == "s1"
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
- return_value="foo",
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.success
- assert result.exception is None
- assert result.return_value == "foo"
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert not acquired2
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert len(acquired3) == 1
+ assert acquired3[0].id == "s1"
- async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ jobs = [Job(task_id="task1") for _ in range(2)]
+ for job in jobs:
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
- exception=ValueError("foo"),
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.error
- assert isinstance(result.exception, ValueError)
- assert result.exception.args == ("foo",)
- assert result.return_value is None
+ # The first worker gets the first job in the queue
+ jobs1 = await datastore.acquire_jobs("worker1", 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # The second worker gets the second job
+ jobs2 = await datastore.acquire_jobs("worker2", 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
- async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ # The third worker gets nothing
+ jobs3 = await datastore.acquire_jobs("worker3", 1)
+ assert not jobs3
+
+ async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.success,
+ return_value="foo",
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == "foo"
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.missed_start_deadline
- assert result.exception is None
- assert result.return_value is None
+ async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ("foo",)
+ assert result.return_value is None
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
- acquired = await datastore.acquire_jobs("worker1", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- await datastore.release_job(
- "worker1",
- acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.cancelled
- assert result.exception is None
- assert result.return_value is None
+ async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker1", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker1",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
async def test_acquire_jobs_lock_timeout(
self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory
@@ -519,51 +966,49 @@ class TestAsyncStores:
released within the lock timeout period.
"""
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
-
- # First, one worker acquires the first available job
- acquired = await datastore.acquire_jobs("worker1", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- # Try to acquire the job just at the threshold (now == acquired_until).
- # This should not yield any jobs.
- freezer.tick(30)
- assert not await datastore.acquire_jobs("worker2", 1)
-
- # Right after that, the job should be available
- freezer.tick(1)
- acquired = await datastore.acquire_jobs("worker2", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ # First, one worker acquires the first available job
+ acquired = await datastore.acquire_jobs("worker1", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not await datastore.acquire_jobs("worker2", 1)
+
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = await datastore.acquire_jobs("worker2", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
async def test_acquire_jobs_max_number_exceeded(
self, datastore: AsyncDataStore
) -> None:
- async with datastore:
- await datastore.add_task(
- Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
- )
- jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
- for job in jobs:
- await datastore.add_job(job)
-
- # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
-
- # Release one job, and the worker should be able to acquire the third job
- await datastore.release_job(
- "worker1",
- acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
- return_value=None,
- ),
- )
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [jobs[2].id]
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
+ for job in jobs:
+ await datastore.add_job(job)
+
+ # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
+
+ # Release one job, and the worker should be able to acquire the third job
+ await datastore.release_job(
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult(
+ job_id=acquired_jobs[0].id,
+ outcome=JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [jobs[2].id]
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index 7ec90ab..ffa4be7 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -1,9 +1,10 @@
from __future__ import annotations
+from collections.abc import AsyncGenerator, Generator
from concurrent.futures import Future
from datetime import datetime, timezone
from queue import Empty, Queue
-from typing import Callable
+from typing import Any
import pytest
from _pytest.fixtures import SubRequest
@@ -73,8 +74,10 @@ async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
),
]
)
-def broker(request: SubRequest) -> Callable[[], EventBroker]:
- return request.param
+def broker(request: SubRequest) -> Generator[EventBroker, Any, None]:
+ request.param.start()
+ yield request.param
+ request.param.stop()
@pytest.fixture(
@@ -87,23 +90,24 @@ def broker(request: SubRequest) -> Callable[[], EventBroker]:
),
]
)
-def async_broker(request: SubRequest) -> Callable[[], AsyncEventBroker]:
- return request.param
+async def async_broker(request: SubRequest) -> AsyncGenerator[AsyncEventBroker, Any]:
+ await request.param.start()
+ yield request.param
+ await request.param.stop()
class TestEventBroker:
def test_publish_subscribe(self, broker: EventBroker) -> None:
queue: Queue[Event] = Queue()
- with broker:
- broker.subscribe(queue.put_nowait)
- broker.subscribe(queue.put_nowait)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- broker.publish(event)
- event1 = queue.get(timeout=3)
- event2 = queue.get(timeout=1)
+ broker.subscribe(queue.put_nowait)
+ broker.subscribe(queue.put_nowait)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
+ broker.publish(event)
+ event1 = queue.get(timeout=3)
+ event2 = queue.get(timeout=1)
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
@@ -115,43 +119,39 @@ class TestEventBroker:
def test_subscribe_one_shot(self, broker: EventBroker) -> None:
queue: Queue[Event] = Queue()
- with broker:
- broker.subscribe(queue.put_nowait, one_shot=True)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- broker.publish(event)
- event = ScheduleAdded(
- schedule_id="schedule2",
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
- )
- broker.publish(event)
- received_event = queue.get(timeout=3)
- with pytest.raises(Empty):
- queue.get(timeout=0.1)
+ broker.subscribe(queue.put_nowait, one_shot=True)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
+ broker.publish(event)
+ event = ScheduleAdded(
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
+ broker.publish(event)
+ received_event = queue.get(timeout=3)
+ with pytest.raises(Empty):
+ queue.get(timeout=0.1)
assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == "schedule1"
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue: Queue[Event] = Queue()
- with broker:
- subscription = broker.subscribe(queue.put_nowait)
- broker.publish(Event())
- queue.get(timeout=3)
+ subscription = broker.subscribe(queue.put_nowait)
+ broker.publish(Event())
+ queue.get(timeout=3)
- subscription.unsubscribe()
- broker.publish(Event())
- with pytest.raises(Empty):
- queue.get(timeout=0.1)
+ subscription.unsubscribe()
+ broker.publish(Event())
+ with pytest.raises(Empty):
+ queue.get(timeout=0.1)
def test_publish_no_subscribers(
self, broker: EventBroker, caplog: LogCaptureFixture
) -> None:
- with broker:
- broker.publish(Event())
-
+ broker.publish(Event())
assert not caplog.text
def test_publish_exception(
@@ -162,33 +162,31 @@ class TestEventBroker:
timestamp = datetime.now(timezone.utc)
event_future: Future[Event] = Future()
- with broker:
- broker.subscribe(bad_subscriber)
- broker.subscribe(event_future.set_result)
- broker.publish(Event(timestamp=timestamp))
+ broker.subscribe(bad_subscriber)
+ broker.subscribe(event_future.set_result)
+ broker.publish(Event(timestamp=timestamp))
- event = event_future.result(3)
- assert isinstance(event, Event)
- assert event.timestamp == timestamp
- assert "Error delivering Event" in caplog.text
+ event = event_future.result(3)
+ assert isinstance(event, Event)
+ assert event.timestamp == timestamp
+ assert "Error delivering Event" in caplog.text
@pytest.mark.anyio
class TestAsyncEventBroker:
async def test_publish_subscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream(2)
- async with async_broker:
- async_broker.subscribe(send.send)
- async_broker.subscribe(send.send_nowait)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- await async_broker.publish(event)
-
- with fail_after(3):
- event1 = await receive.receive()
- event2 = await receive.receive()
+ async_broker.subscribe(send.send)
+ async_broker.subscribe(send.send_nowait)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
+ await async_broker.publish(event)
+
+ with fail_after(3):
+ event1 = await receive.receive()
+ event2 = await receive.receive()
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
@@ -200,47 +198,43 @@ class TestAsyncEventBroker:
async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream(2)
- async with async_broker:
- async_broker.subscribe(send.send, one_shot=True)
- event = ScheduleAdded(
- schedule_id="schedule1",
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
- )
- await async_broker.publish(event)
- event = ScheduleAdded(
- schedule_id="schedule2",
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
- )
- await async_broker.publish(event)
-
- with fail_after(3):
- received_event = await receive.receive()
-
- with pytest.raises(TimeoutError), fail_after(0.1):
- await receive.receive()
+ async_broker.subscribe(send.send, one_shot=True)
+ event = ScheduleAdded(
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
+ await async_broker.publish(event)
+ event = ScheduleAdded(
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
+ await async_broker.publish(event)
+
+ with fail_after(3):
+ received_event = await receive.receive()
+
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == "schedule1"
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
- async with async_broker:
- subscription = async_broker.subscribe(send.send)
- await async_broker.publish(Event())
- with fail_after(3):
- await receive.receive()
+ subscription = async_broker.subscribe(send.send)
+ await async_broker.publish(Event())
+ with fail_after(3):
+ await receive.receive()
- subscription.unsubscribe()
- await async_broker.publish(Event())
- with pytest.raises(TimeoutError), fail_after(0.1):
- await receive.receive()
+ subscription.unsubscribe()
+ await async_broker.publish(Event())
+ with pytest.raises(TimeoutError), fail_after(0.1):
+ await receive.receive()
async def test_publish_no_subscribers(
self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
) -> None:
- async with async_broker:
- await async_broker.publish(Event())
-
+ await async_broker.publish(Event())
assert not caplog.text
async def test_publish_exception(
@@ -251,11 +245,10 @@ class TestAsyncEventBroker:
timestamp = datetime.now(timezone.utc)
send, receive = create_memory_object_stream()
- async with async_broker:
- async_broker.subscribe(bad_subscriber)
- async_broker.subscribe(send.send)
- await async_broker.publish(Event(timestamp=timestamp))
+ async_broker.subscribe(bad_subscriber)
+ async_broker.subscribe(send.send)
+ await async_broker.publish(Event(timestamp=timestamp))
- received_event = await receive.receive()
- assert received_event.timestamp == timestamp
- assert "Error delivering Event" in caplog.text
+ received_event = await receive.receive()
+ assert received_event.timestamp == timestamp
+ assert "Error delivering Event" in caplog.text
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index b8c64a2..e33a91e 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -18,7 +18,6 @@ from apscheduler.events import (
JobAdded,
ScheduleAdded,
ScheduleRemoved,
- SchedulerStarted,
SchedulerStopped,
TaskAdded,
)
@@ -28,6 +27,7 @@ from apscheduler.schedulers.sync import Scheduler
from apscheduler.structures import Job, Task
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
+from apscheduler.workers.async_ import AsyncWorker
if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
@@ -57,23 +57,18 @@ class TestAsyncScheduler:
async def test_schedule_job(self) -> None:
def listener(received_event: Event) -> None:
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, ScheduleRemoved):
event.set()
received_events: list[Event] = []
event = anyio.Event()
- scheduler = AsyncScheduler(start_worker=False)
- scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
- async with scheduler:
+ async with AsyncScheduler(start_worker=False) as scheduler:
+ scheduler.event_broker.subscribe(listener)
await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
with fail_after(3):
await event.wait()
- # The scheduler was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, SchedulerStarted)
-
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
@@ -129,7 +124,7 @@ class TestAsyncScheduler:
async with AsyncScheduler(start_worker=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = anyio.Event()
- scheduler.events.subscribe(job_added_listener, {JobAdded})
+ scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
schedule_id = await scheduler.add_schedule(
dummy_async_job, trigger, max_jitter=max_jitter
)
@@ -142,7 +137,8 @@ class TestAsyncScheduler:
fake_uniform.assert_called_once_with(0, expected_upper_bound)
- # Check that the job was created with the proper amount of jitter in its scheduled time
+ # Check that the job was created with the proper amount of jitter in its
+ # scheduled time
jobs = await scheduler.data_store.get_jobs({job_id})
assert jobs[0].jitter == timedelta(seconds=jitter)
assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(
@@ -188,7 +184,7 @@ class TestAsyncScheduler:
async def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
- assert current_worker.get() is scheduler.worker
+ assert isinstance(current_worker.get(), AsyncWorker)
info = job_info.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
@@ -223,23 +219,18 @@ class TestSyncScheduler:
def test_schedule_job(self):
def listener(received_event: Event) -> None:
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, ScheduleRemoved):
event.set()
received_events: list[Event] = []
event = threading.Event()
- scheduler = Scheduler(start_worker=False)
- scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
- with scheduler:
+ with Scheduler(start_worker=False) as scheduler:
+ scheduler.event_broker.subscribe(listener)
scheduler.add_schedule(dummy_sync_job, trigger, id="foo")
event.wait(3)
- # The scheduler was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, SchedulerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "test_schedulers:dummy_sync_job"
@@ -264,9 +255,6 @@ class TestSyncScheduler:
received_event = received_events.pop(0)
assert isinstance(received_event, SchedulerStopped)
- # There should be no more events on the list
- assert not received_events
-
@pytest.mark.parametrize(
"max_jitter, expected_upper_bound",
[pytest.param(2, 2, id="within"), pytest.param(4, 2.999999, id="exceed")],
@@ -293,7 +281,7 @@ class TestSyncScheduler:
with Scheduler(start_worker=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = threading.Event()
- scheduler.events.subscribe(job_added_listener, {JobAdded})
+ scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
schedule_id = scheduler.add_schedule(
dummy_async_job, trigger, max_jitter=max_jitter
)
@@ -350,7 +338,7 @@ class TestSyncScheduler:
def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
- assert current_worker.get() is scheduler.worker
+ assert current_worker.get() is not None
info = job_info.get()
assert info.task_id == "task_id"
assert info.schedule_id == "foo"
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 6e5568f..e1c7421 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -17,7 +17,6 @@ from apscheduler.events import (
JobAdded,
JobReleased,
TaskAdded,
- WorkerStarted,
WorkerStopped,
)
from apscheduler.structures import Task
@@ -55,26 +54,20 @@ class TestAsyncWorker:
) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
received_events: list[Event] = []
event = anyio.Event()
- data_store = MemoryDataStore()
- worker = AsyncWorker(data_store)
- worker.events.subscribe(listener)
- async with worker:
+ async with AsyncWorker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
await worker.data_store.add_task(Task(id="task_id", func=target_func))
job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
await worker.data_store.add_job(job)
with fail_after(3):
await event.wait()
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -112,16 +105,14 @@ class TestAsyncWorker:
async def test_run_deadline_missed(self) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
received_events: list[Event] = []
event = anyio.Event()
- data_store = MemoryDataStore()
- worker = AsyncWorker(data_store)
- worker.events.subscribe(listener)
- async with worker:
+ async with AsyncWorker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
await worker.data_store.add_task(Task(id="task_id", func=fail_func))
job = Job(
task_id="task_id",
@@ -133,11 +124,7 @@ class TestAsyncWorker:
with fail_after(3):
await event.wait()
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -175,25 +162,19 @@ class TestSyncWorker:
def test_run_job_nonscheduled(self, fail: bool) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
received_events: list[Event] = []
event = threading.Event()
- data_store = MemoryDataStore()
- worker = Worker(data_store)
- worker.events.subscribe(listener)
- with worker:
+ with Worker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
worker.data_store.add_task(Task(id="task_id", func=sync_func))
job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
worker.data_store.add_job(job)
- event.wait(5)
-
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
+ event.wait(3)
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -229,18 +210,16 @@ class TestSyncWorker:
assert not received_events
def test_run_deadline_missed(self) -> None:
- def listener(worker_event: Event):
- received_events.append(worker_event)
- if len(received_events) == 5:
+ def listener(received_event: Event):
+ received_events.append(received_event)
+ if isinstance(received_event, JobReleased):
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
received_events: list[Event] = []
event = threading.Event()
- data_store = MemoryDataStore()
- worker = Worker(data_store)
- worker.events.subscribe(listener)
- with worker:
+ with Worker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
worker.data_store.add_task(Task(id="task_id", func=fail_func))
job = Job(
task_id="task_id",
@@ -251,11 +230,7 @@ class TestSyncWorker:
worker.data_store.add_job(job)
event.wait(3)
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"