summaryrefslogtreecommitdiff
path: root/tests/test_datastores.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_datastores.py')
-rw-r--r--tests/test_datastores.py947
1 files changed, 696 insertions, 251 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index d6376ea..ea5416e 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -1,19 +1,31 @@
from __future__ import annotations
-from contextlib import asynccontextmanager
+import threading
+from collections.abc import Generator
+from contextlib import asynccontextmanager, contextmanager
from datetime import datetime, timezone
from tempfile import TemporaryDirectory
-from typing import AsyncGenerator
+from typing import Any, AsyncGenerator, cast
import anyio
import pytest
+from _pytest.fixtures import SubRequest
from freezegun.api import FrozenDateTimeFactory
from pytest_lazyfixture import lazy_fixture
-from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule
+from apscheduler.abc import (
+ AsyncDataStore,
+ AsyncEventBroker,
+ DataStore,
+ EventBroker,
+ Job,
+ Schedule,
+)
from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome
+from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker
+from apscheduler.eventbrokers.local import LocalEventBroker
from apscheduler.events import (
Event,
ScheduleAdded,
@@ -32,6 +44,12 @@ def memory_store() -> DataStore:
@pytest.fixture
+def adapted_memory_store() -> AsyncDataStore:
+ store = MemoryDataStore()
+ return AsyncDataStoreAdapter(store)
+
+
+@pytest.fixture
def mongodb_store() -> DataStore:
from pymongo import MongoClient
@@ -96,39 +114,6 @@ async def asyncpg_store() -> AsyncDataStore:
await engine.dispose()
-@pytest.fixture(
- params=[
- pytest.param(lazy_fixture("memory_store"), id="memory"),
- pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
- pytest.param(
- lazy_fixture("mongodb_store"),
- id="mongodb",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("psycopg2_store"),
- id="psycopg2",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("asyncpg_store"),
- id="asyncpg",
- marks=[pytest.mark.external_service],
- ),
- pytest.param(
- lazy_fixture("mysql_store"),
- id="mysql",
- marks=[pytest.mark.external_service],
- ),
- ]
-)
-async def datastore(request):
- if isinstance(request.param, DataStore):
- return AsyncDataStoreAdapter(request.param)
- else:
- return request.param
-
-
@pytest.fixture
def schedules() -> list[Schedule]:
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
@@ -144,32 +129,504 @@ def schedules() -> list[Schedule]:
return [schedule1, schedule2, schedule3]
-@asynccontextmanager
-async def capture_events(
- datastore: AsyncDataStore, limit: int, event_types: set[type[Event]] | None = None
-) -> AsyncGenerator[list[Event], None]:
- def listener(event: Event) -> None:
- events.append(event)
- if len(events) == limit:
- limit_event.set()
- subscription.unsubscribe()
+class TestDataStores:
+ @contextmanager
+ def capture_events(
+ self,
+ datastore: DataStore,
+ limit: int,
+ event_types: set[type[Event]] | None = None,
+ ) -> Generator[list[Event], None, None]:
+ def listener(event: Event) -> None:
+ events.append(event)
+ if len(events) == limit:
+ limit_event.set()
+ subscription.unsubscribe()
+
+ events: list[Event] = []
+ limit_event = threading.Event()
+ subscription = datastore.events.subscribe(listener, event_types)
+ yield events
+ if limit:
+ limit_event.wait(2)
+
+ @pytest.fixture
+ def event_broker(self) -> Generator[EventBroker, Any, None]:
+ broker = LocalEventBroker()
+ broker.start()
+ yield broker
+ broker.stop()
+
+ @pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("memory_store"), id="memory"),
+ pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
+ pytest.param(
+ lazy_fixture("mongodb_store"),
+ id="mongodb",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("psycopg2_store"),
+ id="psycopg2",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mysql_store"),
+ id="mysql",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+ )
+ def datastore(
+ self, request: SubRequest, event_broker: EventBroker
+ ) -> Generator[DataStore, Any, None]:
+ datastore = cast(DataStore, request.param)
+ datastore.start(event_broker)
+ yield datastore
+ datastore.stop()
+
+ def test_add_replace_task(self, datastore: DataStore) -> None:
+ import math
- events: list[Event] = []
- limit_event = anyio.Event()
- subscription = datastore.events.subscribe(listener, event_types)
- yield events
- if limit:
- with anyio.fail_after(3):
- await limit_event.wait()
+ event_types = {TaskAdded, TaskUpdated}
+ with self.capture_events(datastore, 3, event_types) as events:
+ datastore.add_task(Task(id="test_task", func=print))
+ datastore.add_task(Task(id="test_task2", func=math.ceil))
+ datastore.add_task(Task(id="test_task", func=repr))
+
+ tasks = datastore.get_tasks()
+ assert len(tasks) == 2
+ assert tasks[0].id == "test_task"
+ assert tasks[0].func is repr
+ assert tasks[1].id == "test_task2"
+ assert tasks[1].func is math.ceil
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
+ assert received_event.task_id == "test_task2"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, TaskUpdated)
+ assert received_event.task_id == "test_task"
+
+ assert not events
+
+ def test_add_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ assert datastore.get_schedules() == schedules
+ assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules
+ assert datastore.get_schedules({"s1"}) == [schedules[0]]
+ assert datastore.get_schedules({"s2"}) == [schedules[1]]
+ assert datastore.get_schedules({"s3"}) == [schedules[2]]
+
+ for event, schedule in zip(events, schedules):
+ assert event.schedule_id == schedule.id
+ assert event.next_fire_time == schedule.next_fire_time
+
+ def test_replace_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ next_fire_time = schedules[2].trigger.next()
+ schedule = Schedule(
+ id="s3",
+ task_id="foo",
+ trigger=schedules[2].trigger,
+ args=(),
+ kwargs={},
+ coalesce=CoalescePolicy.earliest,
+ misfire_grace_time=None,
+ tags=frozenset(),
+ )
+ schedule.next_fire_time = next_fire_time
+ datastore.add_schedule(schedule, ConflictPolicy.replace)
+
+ schedules = datastore.get_schedules({schedule.id})
+ assert schedules[0].task_id == "foo"
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].args == ()
+ assert schedules[0].kwargs == {}
+ assert schedules[0].coalesce is CoalescePolicy.earliest
+ assert schedules[0].misfire_grace_time is None
+ assert schedules[0].tags == frozenset()
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s3"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
+ assert not events
+
+ def test_remove_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ datastore.remove_schedules(["s1", "s2"])
+ assert datastore.get_schedules() == [schedules[2]]
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s1"
+
+ received_event = events.pop(0)
+ assert received_event.schedule_id == "s2"
+
+ assert not events
+
+ @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
+ def test_acquire_release_schedules(
+ self, datastore: DataStore, schedules: list[Schedule]
+ ) -> None:
+ event_types = {ScheduleRemoved, ScheduleUpdated}
+ with self.capture_events(datastore, 2, event_types) as events:
+ for schedule in schedules:
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ # The first scheduler gets the first due schedule
+ schedules1 = datastore.acquire_schedules("dummy-id1", 1)
+ assert len(schedules1) == 1
+ assert schedules1[0].id == "s1"
+
+ # The second scheduler gets the second due schedule
+ schedules2 = datastore.acquire_schedules("dummy-id2", 1)
+ assert len(schedules2) == 1
+ assert schedules2[0].id == "s2"
+
+ # The third scheduler gets nothing
+ schedules3 = datastore.acquire_schedules("dummy-id3", 1)
+ assert not schedules3
+
+ # Update the schedules and check that the job store actually deletes the first
+ # one and updates the second one
+ schedules1[0].next_fire_time = None
+ schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ # Release all the schedules
+ datastore.release_schedules("dummy-id1", schedules1)
+ datastore.release_schedules("dummy-id2", schedules2)
+
+ # Check that the first schedule is gone
+ schedules = datastore.get_schedules()
+ assert len(schedules) == 2
+ assert schedules[0].id == "s2"
+ assert schedules[1].id == "s3"
+
+ # Check for the appropriate update and delete events
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == "s1"
+
+ received_event = events.pop(0)
+ assert isinstance(received_event, ScheduleUpdated)
+ assert received_event.schedule_id == "s2"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
+
+ assert not events
+
+ def test_release_schedule_two_identical_fire_times(
+ self, datastore: DataStore
+ ) -> None:
+ """Regression test for #616."""
+ for i in range(1, 3):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = datastore.acquire_schedules("foo", 3)
+ schedules[0].next_fire_time = None
+ datastore.release_schedules("foo", schedules)
+
+ remaining = datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 1
+ assert remaining[0].id == schedules[1].id
+
+ def test_release_two_schedules_at_once(self, datastore: DataStore) -> None:
+ """Regression test for #621."""
+ for i in range(2):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ datastore.add_schedule(schedule, ConflictPolicy.exception)
+
+ schedules = datastore.acquire_schedules("foo", 3)
+ datastore.release_schedules("foo", schedules)
+
+ remaining = datastore.get_schedules({s.id for s in schedules})
+ assert len(remaining) == 2
+
+ def test_acquire_schedules_lock_timeout(
+ self, datastore: DataStore, schedules: list[Schedule], freezer
+ ) -> None:
+ """
+ Test that a scheduler can acquire schedules that were acquired by another scheduler but
+ not released within the lock timeout period.
+
+ """
+ datastore.add_schedule(schedules[0], ConflictPolicy.exception)
+
+ # First, one scheduler acquires the first available schedule
+ acquired1 = datastore.acquire_schedules("dummy-id1", 1)
+ assert len(acquired1) == 1
+ assert acquired1[0].id == "s1"
+
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ acquired2 = datastore.acquire_schedules("dummy-id2", 1)
+ assert not acquired2
+
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired3 = datastore.acquire_schedules("dummy-id2", 1)
+ assert len(acquired3) == 1
+ assert acquired3[0].id == "s1"
+
+ def test_acquire_multiple_workers(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ jobs = [Job(task_id="task1") for _ in range(2)]
+ for job in jobs:
+ datastore.add_job(job)
+
+ # The first worker gets the first job in the queue
+ jobs1 = datastore.acquire_jobs("worker1", 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
+
+ # The second worker gets the second job
+ jobs2 = datastore.acquire_jobs("worker2", 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
+
+ # The third worker gets nothing
+ jobs3 = datastore.acquire_jobs("worker3", 1)
+ assert not jobs3
+
+ def test_job_release_success(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.success,
+ return_value="foo",
+ ),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == "foo"
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_failure(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ("foo",)
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_missed_deadline(self, datastore: DataStore):
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_job_release_cancelled(self, datastore: DataStore) -> None:
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ acquired = datastore.acquire_jobs("worker1", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ datastore.release_job(
+ "worker1",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ )
+ result = datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not datastore.get_jobs({acquired[0].id})
+ assert not datastore.get_job_result(acquired[0].id)
+
+ def test_acquire_jobs_lock_timeout(
+ self, datastore: DataStore, freezer: FrozenDateTimeFactory
+ ) -> None:
+ """
+ Test that a worker can acquire jobs that were acquired by another scheduler but not
+ released within the lock timeout period.
+
+ """
+ datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ datastore.add_job(job)
+
+ # First, one worker acquires the first available job
+ acquired = datastore.acquire_jobs("worker1", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not datastore.acquire_jobs("worker2", 1)
+
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = datastore.acquire_jobs("worker2", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None:
+ datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
+ for job in jobs:
+ datastore.add_job(job)
+
+ # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
+ acquired_jobs = datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
+
+ # Release one job, and the worker should be able to acquire the third job
+ datastore.release_job(
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult(
+ job_id=acquired_jobs[0].id,
+ outcome=JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [jobs[2].id]
@pytest.mark.anyio
-class TestAsyncStores:
+class TestAsyncDataStores:
+ @asynccontextmanager
+ async def capture_events(
+ self,
+ datastore: AsyncDataStore,
+ limit: int,
+ event_types: set[type[Event]] | None = None,
+ ) -> AsyncGenerator[list[Event], None]:
+ def listener(event: Event) -> None:
+ events.append(event)
+ if len(events) == limit:
+ limit_event.set()
+ subscription.unsubscribe()
+
+ events: list[Event] = []
+ limit_event = anyio.Event()
+ subscription = datastore.events.subscribe(listener, event_types)
+ yield events
+ if limit:
+ with anyio.fail_after(3):
+ await limit_event.wait()
+
+ @pytest.fixture
+ async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]:
+ broker = LocalAsyncEventBroker()
+ await broker.start()
+ yield broker
+ await broker.stop()
+
+ @pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("adapted_memory_store"), id="memory"),
+ pytest.param(
+ lazy_fixture("asyncpg_store"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+ )
+ async def datastore(
+ self, request: SubRequest, event_broker: AsyncEventBroker
+ ) -> AsyncGenerator[AsyncDataStore, Any]:
+ datastore = cast(AsyncDataStore, request.param)
+ await datastore.start(event_broker)
+ yield datastore
+ await datastore.stop()
+
async def test_add_replace_task(self, datastore: AsyncDataStore) -> None:
import math
event_types = {TaskAdded, TaskUpdated}
- async with datastore, capture_events(datastore, 3, event_types) as events:
+ async with self.capture_events(datastore, 3, event_types) as events:
await datastore.add_task(Task(id="test_task", func=print))
await datastore.add_task(Task(id="test_task2", func=math.ceil))
await datastore.add_task(Task(id="test_task", func=repr))
@@ -198,7 +655,7 @@ class TestAsyncStores:
async def test_add_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events:
+ async with self.capture_events(datastore, 3, {ScheduleAdded}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -215,7 +672,7 @@ class TestAsyncStores:
async def test_replace_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events:
+ async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -252,7 +709,7 @@ class TestAsyncStores:
async def test_remove_schedules(
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
- async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events:
+ async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -272,7 +729,7 @@ class TestAsyncStores:
self, datastore: AsyncDataStore, schedules: list[Schedule]
) -> None:
event_types = {ScheduleRemoved, ScheduleUpdated}
- async with datastore, capture_events(datastore, 2, event_types) as events:
+ async with self.capture_events(datastore, 2, event_types) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
@@ -323,16 +780,15 @@ class TestAsyncStores:
self, datastore: AsyncDataStore
) -> None:
"""Regression test for #616."""
- async with datastore:
- for i in range(1, 3):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ for i in range(1, 3):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- schedules = await datastore.acquire_schedules("foo", 3)
- schedules[0].next_fire_time = None
- await datastore.release_schedules("foo", schedules)
+ schedules = await datastore.acquire_schedules("foo", 3)
+ schedules[0].next_fire_time = None
+ await datastore.release_schedules("foo", schedules)
remaining = await datastore.get_schedules({s.id for s in schedules})
assert len(remaining) == 1
@@ -342,15 +798,14 @@ class TestAsyncStores:
self, datastore: AsyncDataStore
) -> None:
"""Regression test for #621."""
- async with datastore:
- for i in range(2):
- trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
- schedule.next_fire_time = trigger.next()
- await datastore.add_schedule(schedule, ConflictPolicy.exception)
+ for i in range(2):
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
+ schedule.next_fire_time = trigger.next()
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- schedules = await datastore.acquire_schedules("foo", 3)
- await datastore.release_schedules("foo", schedules)
+ schedules = await datastore.acquire_schedules("foo", 3)
+ await datastore.release_schedules("foo", schedules)
remaining = await datastore.get_schedules({s.id for s in schedules})
assert len(remaining) == 2
@@ -363,153 +818,145 @@ class TestAsyncStores:
not released within the lock timeout period.
"""
- async with datastore:
- await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
-
- # First, one scheduler acquires the first available schedule
- acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
- assert len(acquired1) == 1
- assert acquired1[0].id == "s1"
-
- # Try to acquire the schedule just at the threshold (now == acquired_until).
- # This should not yield any schedules.
- freezer.tick(30)
- acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
- assert not acquired2
-
- # Right after that, the schedule should be available
- freezer.tick(1)
- acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
- assert len(acquired3) == 1
- assert acquired3[0].id == "s1"
+ await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
- async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- jobs = [Job(task_id="task1") for _ in range(2)]
- for job in jobs:
- await datastore.add_job(job)
-
- # The first worker gets the first job in the queue
- jobs1 = await datastore.acquire_jobs("worker1", 1)
- assert len(jobs1) == 1
- assert jobs1[0].id == jobs[0].id
-
- # The second worker gets the second job
- jobs2 = await datastore.acquire_jobs("worker2", 1)
- assert len(jobs2) == 1
- assert jobs2[0].id == jobs[1].id
-
- # The third worker gets nothing
- jobs3 = await datastore.acquire_jobs("worker3", 1)
- assert not jobs3
-
- async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ # First, one scheduler acquires the first available schedule
+ acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
+ assert len(acquired1) == 1
+ assert acquired1[0].id == "s1"
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
- return_value="foo",
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.success
- assert result.exception is None
- assert result.return_value == "foo"
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert not acquired2
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
+ assert len(acquired3) == 1
+ assert acquired3[0].id == "s1"
- async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ jobs = [Job(task_id="task1") for _ in range(2)]
+ for job in jobs:
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
- exception=ValueError("foo"),
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.error
- assert isinstance(result.exception, ValueError)
- assert result.exception.args == ("foo",)
- assert result.return_value is None
+ # The first worker gets the first job in the queue
+ jobs1 = await datastore.acquire_jobs("worker1", 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # The second worker gets the second job
+ jobs2 = await datastore.acquire_jobs("worker2", 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
- async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ # The third worker gets nothing
+ jobs3 = await datastore.acquire_jobs("worker3", 1)
+ assert not jobs3
+
+ async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.success,
+ return_value="foo",
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == "foo"
- acquired = await datastore.acquire_jobs("worker_id", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- await datastore.release_job(
- "worker_id",
- acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline
- ),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.missed_start_deadline
- assert result.exception is None
- assert result.return_value is None
+ async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ("foo",)
+ assert result.return_value is None
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
+ async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker_id", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
- acquired = await datastore.acquire_jobs("worker1", 2)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- await datastore.release_job(
- "worker1",
- acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
- )
- result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.cancelled
- assert result.exception is None
- assert result.return_value is None
+ async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ acquired = await datastore.acquire_jobs("worker1", 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ await datastore.release_job(
+ "worker1",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ )
+ result = await datastore.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
- # Check that the job and its result are gone
- assert not await datastore.get_jobs({acquired[0].id})
- assert not await datastore.get_job_result(acquired[0].id)
+ # Check that the job and its result are gone
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
async def test_acquire_jobs_lock_timeout(
self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory
@@ -519,51 +966,49 @@ class TestAsyncStores:
released within the lock timeout period.
"""
- async with datastore:
- await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
- await datastore.add_job(job)
-
- # First, one worker acquires the first available job
- acquired = await datastore.acquire_jobs("worker1", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
-
- # Try to acquire the job just at the threshold (now == acquired_until).
- # This should not yield any jobs.
- freezer.tick(30)
- assert not await datastore.acquire_jobs("worker2", 1)
-
- # Right after that, the job should be available
- freezer.tick(1)
- acquired = await datastore.acquire_jobs("worker2", 1)
- assert len(acquired) == 1
- assert acquired[0].id == job.id
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
+ await datastore.add_job(job)
+
+ # First, one worker acquires the first available job
+ acquired = await datastore.acquire_jobs("worker1", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
+
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not await datastore.acquire_jobs("worker2", 1)
+
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = await datastore.acquire_jobs("worker2", 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == job.id
async def test_acquire_jobs_max_number_exceeded(
self, datastore: AsyncDataStore
) -> None:
- async with datastore:
- await datastore.add_task(
- Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
- )
- jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
- for job in jobs:
- await datastore.add_job(job)
-
- # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
-
- # Release one job, and the worker should be able to acquire the third job
- await datastore.release_job(
- "worker1",
- acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
- return_value=None,
- ),
- )
- acquired_jobs = await datastore.acquire_jobs("worker1", 3)
- assert [job.id for job in acquired_jobs] == [jobs[2].id]
+ await datastore.add_task(
+ Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
+ for job in jobs:
+ await datastore.add_job(job)
+
+ # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
+
+ # Release one job, and the worker should be able to acquire the third job
+ await datastore.release_job(
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult(
+ job_id=acquired_jobs[0].id,
+ outcome=JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
+ assert [job.id for job in acquired_jobs] == [jobs[2].id]