diff options
Diffstat (limited to 'tests/test_datastores.py')
-rw-r--r-- | tests/test_datastores.py | 947 |
1 files changed, 696 insertions, 251 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index d6376ea..ea5416e 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,19 +1,31 @@ from __future__ import annotations -from contextlib import asynccontextmanager +import threading +from collections.abc import Generator +from contextlib import asynccontextmanager, contextmanager from datetime import datetime, timezone from tempfile import TemporaryDirectory -from typing import AsyncGenerator +from typing import Any, AsyncGenerator, cast import anyio import pytest +from _pytest.fixtures import SubRequest from freezegun.api import FrozenDateTimeFactory from pytest_lazyfixture import lazy_fixture -from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule +from apscheduler.abc import ( + AsyncDataStore, + AsyncEventBroker, + DataStore, + EventBroker, + Job, + Schedule, +) from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter from apscheduler.datastores.memory import MemoryDataStore from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome +from apscheduler.eventbrokers.async_local import LocalAsyncEventBroker +from apscheduler.eventbrokers.local import LocalEventBroker from apscheduler.events import ( Event, ScheduleAdded, @@ -32,6 +44,12 @@ def memory_store() -> DataStore: @pytest.fixture +def adapted_memory_store() -> AsyncDataStore: + store = MemoryDataStore() + return AsyncDataStoreAdapter(store) + + +@pytest.fixture def mongodb_store() -> DataStore: from pymongo import MongoClient @@ -96,39 +114,6 @@ async def asyncpg_store() -> AsyncDataStore: await engine.dispose() -@pytest.fixture( - params=[ - pytest.param(lazy_fixture("memory_store"), id="memory"), - pytest.param(lazy_fixture("sqlite_store"), id="sqlite"), - pytest.param( - lazy_fixture("mongodb_store"), - id="mongodb", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("psycopg2_store"), - id="psycopg2", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("asyncpg_store"), - id="asyncpg", - marks=[pytest.mark.external_service], - ), - pytest.param( - lazy_fixture("mysql_store"), - id="mysql", - marks=[pytest.mark.external_service], - ), - ] -) -async def datastore(request): - if isinstance(request.param, DataStore): - return AsyncDataStoreAdapter(request.param) - else: - return request.param - - @pytest.fixture def schedules() -> list[Schedule]: trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) @@ -144,32 +129,504 @@ def schedules() -> list[Schedule]: return [schedule1, schedule2, schedule3] -@asynccontextmanager -async def capture_events( - datastore: AsyncDataStore, limit: int, event_types: set[type[Event]] | None = None -) -> AsyncGenerator[list[Event], None]: - def listener(event: Event) -> None: - events.append(event) - if len(events) == limit: - limit_event.set() - subscription.unsubscribe() +class TestDataStores: + @contextmanager + def capture_events( + self, + datastore: DataStore, + limit: int, + event_types: set[type[Event]] | None = None, + ) -> Generator[list[Event], None, None]: + def listener(event: Event) -> None: + events.append(event) + if len(events) == limit: + limit_event.set() + subscription.unsubscribe() + + events: list[Event] = [] + limit_event = threading.Event() + subscription = datastore.events.subscribe(listener, event_types) + yield events + if limit: + limit_event.wait(2) + + @pytest.fixture + def event_broker(self) -> Generator[EventBroker, Any, None]: + broker = LocalEventBroker() + broker.start() + yield broker + broker.stop() + + @pytest.fixture( + params=[ + pytest.param(lazy_fixture("memory_store"), id="memory"), + pytest.param(lazy_fixture("sqlite_store"), id="sqlite"), + pytest.param( + lazy_fixture("mongodb_store"), + id="mongodb", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("psycopg2_store"), + id="psycopg2", + marks=[pytest.mark.external_service], + ), + pytest.param( + lazy_fixture("mysql_store"), + id="mysql", + marks=[pytest.mark.external_service], + ), + ] + ) + def datastore( + self, request: SubRequest, event_broker: EventBroker + ) -> Generator[DataStore, Any, None]: + datastore = cast(DataStore, request.param) + datastore.start(event_broker) + yield datastore + datastore.stop() + + def test_add_replace_task(self, datastore: DataStore) -> None: + import math - events: list[Event] = [] - limit_event = anyio.Event() - subscription = datastore.events.subscribe(listener, event_types) - yield events - if limit: - with anyio.fail_after(3): - await limit_event.wait() + event_types = {TaskAdded, TaskUpdated} + with self.capture_events(datastore, 3, event_types) as events: + datastore.add_task(Task(id="test_task", func=print)) + datastore.add_task(Task(id="test_task2", func=math.ceil)) + datastore.add_task(Task(id="test_task", func=repr)) + + tasks = datastore.get_tasks() + assert len(tasks) == 2 + assert tasks[0].id == "test_task" + assert tasks[0].func is repr + assert tasks[1].id == "test_task2" + assert tasks[1].func is math.ceil + + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task" + + received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) + assert received_event.task_id == "test_task2" + + received_event = events.pop(0) + assert isinstance(received_event, TaskUpdated) + assert received_event.task_id == "test_task" + + assert not events + + def test_add_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 3, {ScheduleAdded}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + assert datastore.get_schedules() == schedules + assert datastore.get_schedules({"s1", "s2", "s3"}) == schedules + assert datastore.get_schedules({"s1"}) == [schedules[0]] + assert datastore.get_schedules({"s2"}) == [schedules[1]] + assert datastore.get_schedules({"s3"}) == [schedules[2]] + + for event, schedule in zip(events, schedules): + assert event.schedule_id == schedule.id + assert event.next_fire_time == schedule.next_fire_time + + def test_replace_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + next_fire_time = schedules[2].trigger.next() + schedule = Schedule( + id="s3", + task_id="foo", + trigger=schedules[2].trigger, + args=(), + kwargs={}, + coalesce=CoalescePolicy.earliest, + misfire_grace_time=None, + tags=frozenset(), + ) + schedule.next_fire_time = next_fire_time + datastore.add_schedule(schedule, ConflictPolicy.replace) + + schedules = datastore.get_schedules({schedule.id}) + assert schedules[0].task_id == "foo" + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert schedules[0].coalesce is CoalescePolicy.earliest + assert schedules[0].misfire_grace_time is None + assert schedules[0].tags == frozenset() + + received_event = events.pop(0) + assert received_event.schedule_id == "s3" + assert received_event.next_fire_time == datetime( + 2020, 9, 15, tzinfo=timezone.utc + ) + assert not events + + def test_remove_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + datastore.remove_schedules(["s1", "s2"]) + assert datastore.get_schedules() == [schedules[2]] + + received_event = events.pop(0) + assert received_event.schedule_id == "s1" + + received_event = events.pop(0) + assert received_event.schedule_id == "s2" + + assert not events + + @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) + def test_acquire_release_schedules( + self, datastore: DataStore, schedules: list[Schedule] + ) -> None: + event_types = {ScheduleRemoved, ScheduleUpdated} + with self.capture_events(datastore, 2, event_types) as events: + for schedule in schedules: + datastore.add_schedule(schedule, ConflictPolicy.exception) + + # The first scheduler gets the first due schedule + schedules1 = datastore.acquire_schedules("dummy-id1", 1) + assert len(schedules1) == 1 + assert schedules1[0].id == "s1" + + # The second scheduler gets the second due schedule + schedules2 = datastore.acquire_schedules("dummy-id2", 1) + assert len(schedules2) == 1 + assert schedules2[0].id == "s2" + + # The third scheduler gets nothing + schedules3 = datastore.acquire_schedules("dummy-id3", 1) + assert not schedules3 + + # Update the schedules and check that the job store actually deletes the first + # one and updates the second one + schedules1[0].next_fire_time = None + schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) + + # Release all the schedules + datastore.release_schedules("dummy-id1", schedules1) + datastore.release_schedules("dummy-id2", schedules2) + + # Check that the first schedule is gone + schedules = datastore.get_schedules() + assert len(schedules) == 2 + assert schedules[0].id == "s2" + assert schedules[1].id == "s3" + + # Check for the appropriate update and delete events + received_event = events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == "s1" + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s2" + assert received_event.next_fire_time == datetime( + 2020, 9, 15, tzinfo=timezone.utc + ) + + assert not events + + def test_release_schedule_two_identical_fire_times( + self, datastore: DataStore + ) -> None: + """Regression test for #616.""" + for i in range(1, 3): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = datastore.acquire_schedules("foo", 3) + schedules[0].next_fire_time = None + datastore.release_schedules("foo", schedules) + + remaining = datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 1 + assert remaining[0].id == schedules[1].id + + def test_release_two_schedules_at_once(self, datastore: DataStore) -> None: + """Regression test for #621.""" + for i in range(2): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + datastore.add_schedule(schedule, ConflictPolicy.exception) + + schedules = datastore.acquire_schedules("foo", 3) + datastore.release_schedules("foo", schedules) + + remaining = datastore.get_schedules({s.id for s in schedules}) + assert len(remaining) == 2 + + def test_acquire_schedules_lock_timeout( + self, datastore: DataStore, schedules: list[Schedule], freezer + ) -> None: + """ + Test that a scheduler can acquire schedules that were acquired by another scheduler but + not released within the lock timeout period. + + """ + datastore.add_schedule(schedules[0], ConflictPolicy.exception) + + # First, one scheduler acquires the first available schedule + acquired1 = datastore.acquire_schedules("dummy-id1", 1) + assert len(acquired1) == 1 + assert acquired1[0].id == "s1" + + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + acquired2 = datastore.acquire_schedules("dummy-id2", 1) + assert not acquired2 + + # Right after that, the schedule should be available + freezer.tick(1) + acquired3 = datastore.acquire_schedules("dummy-id2", 1) + assert len(acquired3) == 1 + assert acquired3[0].id == "s1" + + def test_acquire_multiple_workers(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + jobs = [Job(task_id="task1") for _ in range(2)] + for job in jobs: + datastore.add_job(job) + + # The first worker gets the first job in the queue + jobs1 = datastore.acquire_jobs("worker1", 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id + + # The second worker gets the second job + jobs2 = datastore.acquire_jobs("worker2", 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id + + # The third worker gets nothing + jobs3 = datastore.acquire_jobs("worker3", 1) + assert not jobs3 + + def test_job_release_success(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.success, + return_value="foo", + ), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == "foo" + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_failure(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.error, + exception=ValueError("foo"), + ), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, ValueError) + assert result.exception.args == ("foo",) + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_missed_deadline(self, datastore: DataStore): + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_job_release_cancelled(self, datastore: DataStore) -> None: + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + acquired = datastore.acquire_jobs("worker1", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + datastore.release_job( + "worker1", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + ) + result = datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not datastore.get_jobs({acquired[0].id}) + assert not datastore.get_job_result(acquired[0].id) + + def test_acquire_jobs_lock_timeout( + self, datastore: DataStore, freezer: FrozenDateTimeFactory + ) -> None: + """ + Test that a worker can acquire jobs that were acquired by another scheduler but not + released within the lock timeout period. + + """ + datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + datastore.add_job(job) + + # First, one worker acquires the first available job + acquired = datastore.acquire_jobs("worker1", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not datastore.acquire_jobs("worker2", 1) + + # Right after that, the job should be available + freezer.tick(1) + acquired = datastore.acquire_jobs("worker2", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + def test_acquire_jobs_max_number_exceeded(self, datastore: DataStore) -> None: + datastore.add_task( + Task(id="task1", func=asynccontextmanager, max_running_jobs=2) + ) + jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] + for job in jobs: + datastore.add_job(job) + + # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 + acquired_jobs = datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] + + # Release one job, and the worker should be able to acquire the third job + datastore.release_job( + "worker1", + acquired_jobs[0].task_id, + JobResult( + job_id=acquired_jobs[0].id, + outcome=JobOutcome.success, + return_value=None, + ), + ) + acquired_jobs = datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [jobs[2].id] @pytest.mark.anyio -class TestAsyncStores: +class TestAsyncDataStores: + @asynccontextmanager + async def capture_events( + self, + datastore: AsyncDataStore, + limit: int, + event_types: set[type[Event]] | None = None, + ) -> AsyncGenerator[list[Event], None]: + def listener(event: Event) -> None: + events.append(event) + if len(events) == limit: + limit_event.set() + subscription.unsubscribe() + + events: list[Event] = [] + limit_event = anyio.Event() + subscription = datastore.events.subscribe(listener, event_types) + yield events + if limit: + with anyio.fail_after(3): + await limit_event.wait() + + @pytest.fixture + async def event_broker(self) -> AsyncGenerator[AsyncEventBroker, Any]: + broker = LocalAsyncEventBroker() + await broker.start() + yield broker + await broker.stop() + + @pytest.fixture( + params=[ + pytest.param(lazy_fixture("adapted_memory_store"), id="memory"), + pytest.param( + lazy_fixture("asyncpg_store"), + id="asyncpg", + marks=[pytest.mark.external_service], + ), + ] + ) + async def datastore( + self, request: SubRequest, event_broker: AsyncEventBroker + ) -> AsyncGenerator[AsyncDataStore, Any]: + datastore = cast(AsyncDataStore, request.param) + await datastore.start(event_broker) + yield datastore + await datastore.stop() + async def test_add_replace_task(self, datastore: AsyncDataStore) -> None: import math event_types = {TaskAdded, TaskUpdated} - async with datastore, capture_events(datastore, 3, event_types) as events: + async with self.capture_events(datastore, 3, event_types) as events: await datastore.add_task(Task(id="test_task", func=print)) await datastore.add_task(Task(id="test_task2", func=math.ceil)) await datastore.add_task(Task(id="test_task", func=repr)) @@ -198,7 +655,7 @@ class TestAsyncStores: async def test_add_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events: + async with self.capture_events(datastore, 3, {ScheduleAdded}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -215,7 +672,7 @@ class TestAsyncStores: async def test_replace_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events: + async with self.capture_events(datastore, 1, {ScheduleUpdated}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -252,7 +709,7 @@ class TestAsyncStores: async def test_remove_schedules( self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: - async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events: + async with self.capture_events(datastore, 2, {ScheduleRemoved}) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -272,7 +729,7 @@ class TestAsyncStores: self, datastore: AsyncDataStore, schedules: list[Schedule] ) -> None: event_types = {ScheduleRemoved, ScheduleUpdated} - async with datastore, capture_events(datastore, 2, event_types) as events: + async with self.capture_events(datastore, 2, event_types) as events: for schedule in schedules: await datastore.add_schedule(schedule, ConflictPolicy.exception) @@ -323,16 +780,15 @@ class TestAsyncStores: self, datastore: AsyncDataStore ) -> None: """Regression test for #616.""" - async with datastore: - for i in range(1, 3): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + for i in range(1, 3): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) - schedules = await datastore.acquire_schedules("foo", 3) - schedules[0].next_fire_time = None - await datastore.release_schedules("foo", schedules) + schedules = await datastore.acquire_schedules("foo", 3) + schedules[0].next_fire_time = None + await datastore.release_schedules("foo", schedules) remaining = await datastore.get_schedules({s.id for s in schedules}) assert len(remaining) == 1 @@ -342,15 +798,14 @@ class TestAsyncStores: self, datastore: AsyncDataStore ) -> None: """Regression test for #621.""" - async with datastore: - for i in range(2): - trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) - schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) + for i in range(2): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) - schedules = await datastore.acquire_schedules("foo", 3) - await datastore.release_schedules("foo", schedules) + schedules = await datastore.acquire_schedules("foo", 3) + await datastore.release_schedules("foo", schedules) remaining = await datastore.get_schedules({s.id for s in schedules}) assert len(remaining) == 2 @@ -363,153 +818,145 @@ class TestAsyncStores: not released within the lock timeout period. """ - async with datastore: - await datastore.add_schedule(schedules[0], ConflictPolicy.exception) - - # First, one scheduler acquires the first available schedule - acquired1 = await datastore.acquire_schedules("dummy-id1", 1) - assert len(acquired1) == 1 - assert acquired1[0].id == "s1" - - # Try to acquire the schedule just at the threshold (now == acquired_until). - # This should not yield any schedules. - freezer.tick(30) - acquired2 = await datastore.acquire_schedules("dummy-id2", 1) - assert not acquired2 - - # Right after that, the schedule should be available - freezer.tick(1) - acquired3 = await datastore.acquire_schedules("dummy-id2", 1) - assert len(acquired3) == 1 - assert acquired3[0].id == "s1" + await datastore.add_schedule(schedules[0], ConflictPolicy.exception) - async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - jobs = [Job(task_id="task1") for _ in range(2)] - for job in jobs: - await datastore.add_job(job) - - # The first worker gets the first job in the queue - jobs1 = await datastore.acquire_jobs("worker1", 1) - assert len(jobs1) == 1 - assert jobs1[0].id == jobs[0].id - - # The second worker gets the second job - jobs2 = await datastore.acquire_jobs("worker2", 1) - assert len(jobs2) == 1 - assert jobs2[0].id == jobs[1].id - - # The third worker gets nothing - jobs3 = await datastore.acquire_jobs("worker3", 1) - assert not jobs3 - - async def test_job_release_success(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + # First, one scheduler acquires the first available schedule + acquired1 = await datastore.acquire_schedules("dummy-id1", 1) + assert len(acquired1) == 1 + assert acquired1[0].id == "s1" - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, - return_value="foo", - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.success - assert result.exception is None - assert result.return_value == "foo" + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + acquired2 = await datastore.acquire_schedules("dummy-id2", 1) + assert not acquired2 - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Right after that, the schedule should be available + freezer.tick(1) + acquired3 = await datastore.acquire_schedules("dummy-id2", 1) + assert len(acquired3) == 1 + assert acquired3[0].id == "s1" - async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + jobs = [Job(task_id="task1") for _ in range(2)] + for job in jobs: await datastore.add_job(job) - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, - exception=ValueError("foo"), - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.error - assert isinstance(result.exception, ValueError) - assert result.exception.args == ("foo",) - assert result.return_value is None + # The first worker gets the first job in the queue + jobs1 = await datastore.acquire_jobs("worker1", 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # The second worker gets the second job + jobs2 = await datastore.acquire_jobs("worker2", 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id - async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + # The third worker gets nothing + jobs3 = await datastore.acquire_jobs("worker3", 1) + assert not jobs3 + + async def test_job_release_success(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.success, + return_value="foo", + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == "foo" - acquired = await datastore.acquire_jobs("worker_id", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - await datastore.release_job( - "worker_id", - acquired[0].task_id, - JobResult( - job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline - ), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.missed_start_deadline - assert result.exception is None - assert result.return_value is None + async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult( + job_id=acquired[0].id, + outcome=JobOutcome.error, + exception=ValueError("foo"), + ), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, ValueError) + assert result.exception.args == ("foo",) + assert result.return_value is None - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) + async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker_id", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker_id", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None - acquired = await datastore.acquire_jobs("worker1", 2) - assert len(acquired) == 1 - assert acquired[0].id == job.id + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - await datastore.release_job( - "worker1", - acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), - ) - result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.cancelled - assert result.exception is None - assert result.return_value is None + async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + acquired = await datastore.acquire_jobs("worker1", 2) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + await datastore.release_job( + "worker1", + acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + ) + result = await datastore.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None - # Check that the job and its result are gone - assert not await datastore.get_jobs({acquired[0].id}) - assert not await datastore.get_job_result(acquired[0].id) + # Check that the job and its result are gone + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) async def test_acquire_jobs_lock_timeout( self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory @@ -519,51 +966,49 @@ class TestAsyncStores: released within the lock timeout period. """ - async with datastore: - await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") - await datastore.add_job(job) - - # First, one worker acquires the first available job - acquired = await datastore.acquire_jobs("worker1", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id - - # Try to acquire the job just at the threshold (now == acquired_until). - # This should not yield any jobs. - freezer.tick(30) - assert not await datastore.acquire_jobs("worker2", 1) - - # Right after that, the job should be available - freezer.tick(1) - acquired = await datastore.acquire_jobs("worker2", 1) - assert len(acquired) == 1 - assert acquired[0].id == job.id + await datastore.add_task(Task(id="task1", func=asynccontextmanager)) + job = Job(task_id="task1") + await datastore.add_job(job) + + # First, one worker acquires the first available job + acquired = await datastore.acquire_jobs("worker1", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id + + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not await datastore.acquire_jobs("worker2", 1) + + # Right after that, the job should be available + freezer.tick(1) + acquired = await datastore.acquire_jobs("worker2", 1) + assert len(acquired) == 1 + assert acquired[0].id == job.id async def test_acquire_jobs_max_number_exceeded( self, datastore: AsyncDataStore ) -> None: - async with datastore: - await datastore.add_task( - Task(id="task1", func=asynccontextmanager, max_running_jobs=2) - ) - jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] - for job in jobs: - await datastore.add_job(job) - - # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] - - # Release one job, and the worker should be able to acquire the third job - await datastore.release_job( - "worker1", - acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, - return_value=None, - ), - ) - acquired_jobs = await datastore.acquire_jobs("worker1", 3) - assert [job.id for job in acquired_jobs] == [jobs[2].id] + await datastore.add_task( + Task(id="task1", func=asynccontextmanager, max_running_jobs=2) + ) + jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")] + for job in jobs: + await datastore.add_job(job) + + # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] + + # Release one job, and the worker should be able to acquire the third job + await datastore.release_job( + "worker1", + acquired_jobs[0].task_id, + JobResult( + job_id=acquired_jobs[0].id, + outcome=JobOutcome.success, + return_value=None, + ), + ) + acquired_jobs = await datastore.acquire_jobs("worker1", 3) + assert [job.id for job in acquired_jobs] == [jobs[2].id] |