From 2a4eb36499f976e4da6b4ff18880b8292d067975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 11 Sep 2021 23:03:46 +0300 Subject: Applied pytest-lazy-fixture to data stores too --- tests/conftest.py | 113 +--------------- tests/test_datastores.py | 327 +++++++++++++++++++++++++++++++---------------- 2 files changed, 218 insertions(+), 222 deletions(-) (limited to 'tests') diff --git a/tests/conftest.py b/tests/conftest.py index 135c18d..89c4510 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,13 +1,9 @@ import sys -from contextlib import asynccontextmanager, contextmanager -from tempfile import TemporaryDirectory -from typing import AsyncContextManager, AsyncGenerator, ContextManager, Generator, Optional +from typing import Optional import pytest -from apscheduler.abc import AsyncDataStore, DataStore, Serializer -from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter -from apscheduler.datastores.memory import MemoryDataStore +from apscheduler.abc import Serializer from apscheduler.serializers.cbor import CBORSerializer from apscheduler.serializers.json import JSONSerializer from apscheduler.serializers.pickle import PickleSerializer @@ -35,108 +31,3 @@ def serializer(request) -> Optional[Serializer]: @pytest.fixture def anyio_backend() -> 'str': return 'asyncio' - - -@contextmanager -def setup_memory_store() -> Generator[DataStore, None, None]: - yield MemoryDataStore() - - -@contextmanager -def setup_mongodb_store() -> Generator[DataStore, None, None]: - from pymongo import MongoClient - - from apscheduler.datastores.mongodb import MongoDBDataStore - - with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client: - yield MongoDBDataStore(client, start_from_scratch=True) - - -@contextmanager -def setup_sqlite_store() -> Generator[DataStore, None, None]: - from sqlalchemy.future import create_engine - - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore - - with TemporaryDirectory('sqlite_') as tempdir: - engine = create_engine(f'sqlite:///{tempdir}/test.db') - try: - yield SQLAlchemyDataStore(engine) - finally: - engine.dispose() - - -@contextmanager -def setup_psycopg2_store() -> Generator[DataStore, None, None]: - from sqlalchemy.future import create_engine - - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore - - engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb') - try: - yield SQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - engine.dispose() - - -@contextmanager -def setup_mysql_store() -> Generator[DataStore, None, None]: - from sqlalchemy.future import create_engine - - from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore - - engine = create_engine('mysql+pymysql://root:secret@localhost/testdb') - try: - yield SQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - engine.dispose() - - -@asynccontextmanager -async def setup_asyncpg_store() -> AsyncGenerator[AsyncDataStore, None]: - from sqlalchemy.ext.asyncio import create_async_engine - - from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore - - engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb', - future=True) - try: - yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True) - finally: - await engine.dispose() - - -@pytest.fixture(params=[ - pytest.param(setup_memory_store, id='memory'), - pytest.param(setup_sqlite_store, id='sqlite'), - pytest.param(setup_mongodb_store, id='mongodb', marks=[pytest.mark.externaldb]), - pytest.param(setup_psycopg2_store, id='psycopg2', marks=[pytest.mark.externaldb]), - pytest.param(setup_mysql_store, id='mysql', marks=[pytest.mark.externaldb]) -]) -def setup_sync_store(request) -> ContextManager[DataStore]: - return request.param - - -@pytest.fixture(params=[ - pytest.param(setup_asyncpg_store, id='asyncpg', marks=[pytest.mark.externaldb]) -]) -def setup_async_store(request) -> AsyncContextManager[AsyncDataStore]: - return request.param - - -@pytest.fixture(params=[ - pytest.param(setup_memory_store, id='memory'), - pytest.param(setup_sqlite_store, id='sqlite'), - pytest.param(setup_mongodb_store, id='mongodb', marks=[pytest.mark.externaldb]), - pytest.param(setup_psycopg2_store, id='psycopg2', marks=[pytest.mark.externaldb]), - pytest.param(setup_mysql_store, id='mysql', marks=[pytest.mark.externaldb]), - pytest.param(setup_asyncpg_store, id='asyncpg', marks=[pytest.mark.externaldb]) -]) -async def datastore_cm(request): - cm = request.param() - if isinstance(cm, ContextManager): - with cm as store: - yield AsyncDataStoreAdapter(store) - else: - async with cm as store: - yield store diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 74db6e7..4b01662 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -2,13 +2,16 @@ from __future__ import annotations from contextlib import asynccontextmanager from datetime import datetime, timezone -from typing import AsyncContextManager, AsyncGenerator, List, Optional, Set, Type +from tempfile import TemporaryDirectory +from typing import AsyncGenerator, List, Optional, Set, Type import anyio import pytest from freezegun.api import FrozenDateTimeFactory -from apscheduler.abc import AsyncDataStore, Job, Schedule +from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule +from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter +from apscheduler.datastores.memory import MemoryDataStore from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome from apscheduler.events import ( Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskUpdated) @@ -16,6 +19,114 @@ from apscheduler.structures import JobResult, Task from apscheduler.triggers.date import DateTrigger +@pytest.fixture +def memory_store() -> DataStore: + yield MemoryDataStore() + + +@pytest.fixture +def mongodb_store() -> DataStore: + from pymongo import MongoClient + + from apscheduler.datastores.mongodb import MongoDBDataStore + + with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client: + yield MongoDBDataStore(client, start_from_scratch=True) + + +@pytest.fixture +def sqlite_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + with TemporaryDirectory('sqlite_') as tempdir: + engine = create_engine(f'sqlite:///{tempdir}/test.db') + try: + yield SQLAlchemyDataStore(engine) + finally: + engine.dispose() + + +@pytest.fixture +def psycopg2_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb') + try: + yield SQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + engine.dispose() + + +@pytest.fixture +def mysql_store() -> DataStore: + from sqlalchemy.future import create_engine + + from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + + engine = create_engine('mysql+pymysql://root:secret@localhost/testdb') + try: + yield SQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + engine.dispose() + + +@pytest.fixture +async def asyncpg_store() -> AsyncDataStore: + from sqlalchemy.ext.asyncio import create_async_engine + + from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore + + engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb', + future=True) + try: + yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True) + finally: + await engine.dispose() + + +@pytest.fixture(params=[ + pytest.param(pytest.lazy_fixture('memory_store'), id='memory'), + pytest.param(pytest.lazy_fixture('sqlite'), id='sqlite'), + pytest.param(pytest.lazy_fixture('mongodb_store'), id='mongodb', + marks=[pytest.mark.external_service]), + pytest.param(pytest.lazy_fixture('psycopg2_store'), id='psycopg2', + marks=[pytest.mark.external_service]), + pytest.param(pytest.lazy_fixture('mysql_store'), id='mysql', + marks=[pytest.mark.external_service]) +]) +def sync_store(request) -> DataStore: + return request.param + + +@pytest.fixture(params=[ + pytest.param(pytest.lazy_fixture('asyncpg_store'), id='asyncpg', + marks=[pytest.mark.external_service]) +]) +def async_store(request) -> AsyncDataStore: + return request.param + + +@pytest.fixture(params=[ + pytest.param(pytest.lazy_fixture('memory_store'), id='memory'), + pytest.param(pytest.lazy_fixture('sqlite_store'), id='sqlite'), + pytest.param(pytest.lazy_fixture('mongodb_store'), id='mongodb', + marks=[pytest.mark.external_service]), + pytest.param(pytest.lazy_fixture('psycopg2_store'), id='psycopg2', + marks=[pytest.mark.external_service]), + pytest.param(pytest.lazy_fixture('mysql_store'), id='mysql', + marks=[pytest.mark.external_service]) +]) +async def datastore(request): + if isinstance(request.param, DataStore): + return AsyncDataStoreAdapter(request.param) + else: + return request.param + + @pytest.fixture def schedules() -> List[Schedule]: trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) @@ -33,18 +144,18 @@ def schedules() -> List[Schedule]: @asynccontextmanager async def capture_events( - store: AsyncDataStore, limit: int, + datastore: AsyncDataStore, limit: int, event_types: Optional[Set[Type[Event]]] = None ) -> AsyncGenerator[List[Event], None]: def listener(event: Event) -> None: events.append(event) if len(events) == limit: limit_event.set() - store.events.unsubscribe(token) + datastore.events.unsubscribe(token) events: List[Event] = [] limit_event = anyio.Event() - token = store.events.subscribe(listener, event_types) + token = datastore.events.subscribe(listener, event_types) yield events if limit: with anyio.fail_after(3): @@ -53,17 +164,16 @@ async def capture_events( @pytest.mark.anyio class TestAsyncStores: - async def test_add_replace_task( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: + async def test_add_replace_task(self, datastore: AsyncDataStore) -> None: import math event_types = {TaskAdded, TaskUpdated} - async with datastore_cm as store, capture_events(store, 3, event_types) as events: - await store.add_task(Task(id='test_task', func=print)) - await store.add_task(Task(id='test_task2', func=math.ceil)) - await store.add_task(Task(id='test_task', func=repr)) + async with datastore, capture_events(datastore, 3, event_types) as events: + await datastore.add_task(Task(id='test_task', func=print)) + await datastore.add_task(Task(id='test_task2', func=math.ceil)) + await datastore.add_task(Task(id='test_task', func=repr)) - tasks = await store.get_tasks() + tasks = await datastore.get_tasks() assert len(tasks) == 2 assert tasks[0].id == 'test_task' assert tasks[0].func is repr @@ -84,36 +194,36 @@ class TestAsyncStores: assert not events - async def test_add_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + async def test_add_schedules(self, datastore: AsyncDataStore, schedules: List[Schedule]) -> None: - async with datastore_cm as store, capture_events(store, 3, {ScheduleAdded}) as events: + async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events: for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) + await datastore.add_schedule(schedule, ConflictPolicy.exception) - assert await store.get_schedules() == schedules - assert await store.get_schedules({'s1', 's2', 's3'}) == schedules - assert await store.get_schedules({'s1'}) == [schedules[0]] - assert await store.get_schedules({'s2'}) == [schedules[1]] - assert await store.get_schedules({'s3'}) == [schedules[2]] + assert await datastore.get_schedules() == schedules + assert await datastore.get_schedules({'s1', 's2', 's3'}) == schedules + assert await datastore.get_schedules({'s1'}) == [schedules[0]] + assert await datastore.get_schedules({'s2'}) == [schedules[1]] + assert await datastore.get_schedules({'s3'}) == [schedules[2]] for event, schedule in zip(events, schedules): assert event.schedule_id == schedule.id assert event.next_fire_time == schedule.next_fire_time - async def test_replace_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + async def test_replace_schedules(self, datastore: AsyncDataStore, schedules: List[Schedule]) -> None: - async with datastore_cm as store, capture_events(store, 1, {ScheduleUpdated}) as events: + async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events: for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) + await datastore.add_schedule(schedule, ConflictPolicy.exception) next_fire_time = schedules[2].trigger.next() schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(), kwargs={}, coalesce=CoalescePolicy.earliest, misfire_grace_time=None, tags=frozenset()) schedule.next_fire_time = next_fire_time - await store.add_schedule(schedule, ConflictPolicy.replace) + await datastore.add_schedule(schedule, ConflictPolicy.replace) - schedules = await store.get_schedules({schedule.id}) + schedules = await datastore.get_schedules({schedule.id}) assert schedules[0].task_id == 'foo' assert schedules[0].next_fire_time == next_fire_time assert schedules[0].args == () @@ -127,14 +237,14 @@ class TestAsyncStores: assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) assert not events - async def test_remove_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + async def test_remove_schedules(self, datastore: AsyncDataStore, schedules: List[Schedule]) -> None: - async with datastore_cm as store, capture_events(store, 2, {ScheduleRemoved}) as events: + async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events: for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) + await datastore.add_schedule(schedule, ConflictPolicy.exception) - await store.remove_schedules(['s1', 's2']) - assert await store.get_schedules() == [schedules[2]] + await datastore.remove_schedules(['s1', 's2']) + assert await datastore.get_schedules() == [schedules[2]] received_event = events.pop(0) assert received_event.schedule_id == 's1' @@ -146,24 +256,24 @@ class TestAsyncStores: @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) async def test_acquire_release_schedules( - self, datastore_cm, schedules: List[Schedule]) -> None: + self, datastore: AsyncDataStore, schedules: List[Schedule]) -> None: event_types = {ScheduleRemoved, ScheduleUpdated} - async with datastore_cm as store, capture_events(store, 2, event_types) as events: + async with datastore, capture_events(datastore, 2, event_types) as events: for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) + await datastore.add_schedule(schedule, ConflictPolicy.exception) # The first scheduler gets the first due schedule - schedules1 = await store.acquire_schedules('dummy-id1', 1) + schedules1 = await datastore.acquire_schedules('dummy-id1', 1) assert len(schedules1) == 1 assert schedules1[0].id == 's1' # The second scheduler gets the second due schedule - schedules2 = await store.acquire_schedules('dummy-id2', 1) + schedules2 = await datastore.acquire_schedules('dummy-id2', 1) assert len(schedules2) == 1 assert schedules2[0].id == 's2' # The third scheduler gets nothing - schedules3 = await store.acquire_schedules('dummy-id3', 1) + schedules3 = await datastore.acquire_schedules('dummy-id3', 1) assert not schedules3 # Update the schedules and check that the job store actually deletes the first @@ -172,11 +282,11 @@ class TestAsyncStores: schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) # Release all the schedules - await store.release_schedules('dummy-id1', schedules1) - await store.release_schedules('dummy-id2', schedules2) + await datastore.release_schedules('dummy-id1', schedules1) + await datastore.release_schedules('dummy-id2', schedules2) # Check that the first schedule is gone - schedules = await store.get_schedules() + schedules = await datastore.get_schedules() assert len(schedules) == 2 assert schedules[0].id == 's2' assert schedules[1].id == 's3' @@ -194,192 +304,187 @@ class TestAsyncStores: assert not events async def test_acquire_schedules_lock_timeout( - self, datastore_cm, schedules: List[Schedule], freezer) -> None: + self, datastore: AsyncDataStore, schedules: List[Schedule], freezer) -> None: """ Test that a scheduler can acquire schedules that were acquired by another scheduler but not released within the lock timeout period. """ - async with datastore_cm as store: - await store.add_schedule(schedules[0], ConflictPolicy.exception) + async with datastore: + await datastore.add_schedule(schedules[0], ConflictPolicy.exception) # First, one scheduler acquires the first available schedule - acquired1 = await store.acquire_schedules('dummy-id1', 1) + acquired1 = await datastore.acquire_schedules('dummy-id1', 1) assert len(acquired1) == 1 assert acquired1[0].id == 's1' # Try to acquire the schedule just at the threshold (now == acquired_until). # This should not yield any schedules. freezer.tick(30) - acquired2 = await store.acquire_schedules('dummy-id2', 1) + acquired2 = await datastore.acquire_schedules('dummy-id2', 1) assert not acquired2 # Right after that, the schedule should be available freezer.tick(1) - acquired3 = await store.acquire_schedules('dummy-id2', 1) + acquired3 = await datastore.acquire_schedules('dummy-id2', 1) assert len(acquired3) == 1 assert acquired3[0].id == 's1' - async def test_acquire_multiple_workers( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None: + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) jobs = [Job(task_id='task1') for _ in range(2)] for job in jobs: - await store.add_job(job) + await datastore.add_job(job) # The first worker gets the first job in the queue - jobs1 = await store.acquire_jobs('worker1', 1) + jobs1 = await datastore.acquire_jobs('worker1', 1) assert len(jobs1) == 1 assert jobs1[0].id == jobs[0].id # The second worker gets the second job - jobs2 = await store.acquire_jobs('worker2', 1) + jobs2 = await datastore.acquire_jobs('worker2', 1) assert len(jobs2) == 1 assert jobs2[0].id == jobs[1].id # The third worker gets nothing - jobs3 = await store.acquire_jobs('worker3', 1) + jobs3 = await datastore.acquire_jobs('worker3', 1) assert not jobs3 - async def test_job_release_success( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async def test_job_release_success(self, datastore: AsyncDataStore) -> None: + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) job = Job(task_id='task1') - await store.add_job(job) + await datastore.add_job(job) - acquired = await store.acquire_jobs('worker_id', 2) + acquired = await datastore.acquire_jobs('worker_id', 2) assert len(acquired) == 1 assert acquired[0].id == job.id - await store.release_job( + await datastore.release_job( 'worker_id', acquired[0].task_id, JobResult(job_id=acquired[0].id, outcome=JobOutcome.success, return_value='foo')) - result = await store.get_job_result(acquired[0].id) + result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.success assert result.exception is None assert result.return_value == 'foo' # Check that the job and its result are gone - assert not await store.get_jobs({acquired[0].id}) - assert not await store.get_job_result(acquired[0].id) + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_job_release_failure( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) job = Job(task_id='task1') - await store.add_job(job) + await datastore.add_job(job) - acquired = await store.acquire_jobs('worker_id', 2) + acquired = await datastore.acquire_jobs('worker_id', 2) assert len(acquired) == 1 assert acquired[0].id == job.id - await store.release_job( + await datastore.release_job( 'worker_id', acquired[0].task_id, JobResult(job_id=acquired[0].id, outcome=JobOutcome.failure, exception=ValueError('foo'))) - result = await store.get_job_result(acquired[0].id) + result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.failure assert isinstance(result.exception, ValueError) assert result.exception.args == ('foo',) assert result.return_value is None # Check that the job and its result are gone - assert not await store.get_jobs({acquired[0].id}) - assert not await store.get_job_result(acquired[0].id) + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_job_release_missed_deadline( - self, datastore_cm: AsyncContextManager[AsyncDataStore]): - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) job = Job(task_id='task1') - await store.add_job(job) + await datastore.add_job(job) - acquired = await store.acquire_jobs('worker_id', 2) + acquired = await datastore.acquire_jobs('worker_id', 2) assert len(acquired) == 1 assert acquired[0].id == job.id - await store.release_job( + await datastore.release_job( 'worker_id', acquired[0].task_id, JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline)) - result = await store.get_job_result(acquired[0].id) + result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline assert result.exception is None assert result.return_value is None # Check that the job and its result are gone - assert not await store.get_jobs({acquired[0].id}) - assert not await store.get_job_result(acquired[0].id) + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_job_release_cancelled( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) job = Job(task_id='task1') - await store.add_job(job) + await datastore.add_job(job) - acquired = await store.acquire_jobs('worker1', 2) + acquired = await datastore.acquire_jobs('worker1', 2) assert len(acquired) == 1 assert acquired[0].id == job.id - await store.release_job('worker1', acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled)) - result = await store.get_job_result(acquired[0].id) + await datastore.release_job( + 'worker1', acquired[0].task_id, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled)) + result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled assert result.exception is None assert result.return_value is None # Check that the job and its result are gone - assert not await store.get_jobs({acquired[0].id}) - assert not await store.get_job_result(acquired[0].id) + assert not await datastore.get_jobs({acquired[0].id}) + assert not await datastore.get_job_result(acquired[0].id) - async def test_acquire_jobs_lock_timeout( - self, datastore_cm: AsyncContextManager[AsyncDataStore], - freezer: FrozenDateTimeFactory) -> None: + async def test_acquire_jobs_lock_timeout(self, datastore: AsyncDataStore, + freezer: FrozenDateTimeFactory) -> None: """ Test that a worker can acquire jobs that were acquired by another scheduler but not released within the lock timeout period. """ - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager)) + async with datastore: + await datastore.add_task(Task(id='task1', func=asynccontextmanager)) job = Job(task_id='task1') - await store.add_job(job) + await datastore.add_job(job) # First, one worker acquires the first available job - acquired = await store.acquire_jobs('worker1', 1) + acquired = await datastore.acquire_jobs('worker1', 1) assert len(acquired) == 1 assert acquired[0].id == job.id # Try to acquire the job just at the threshold (now == acquired_until). # This should not yield any jobs. freezer.tick(30) - assert not await store.acquire_jobs('worker2', 1) + assert not await datastore.acquire_jobs('worker2', 1) # Right after that, the job should be available freezer.tick(1) - acquired = await store.acquire_jobs('worker2', 1) + acquired = await datastore.acquire_jobs('worker2', 1) assert len(acquired) == 1 assert acquired[0].id == job.id - async def test_acquire_jobs_max_number_exceeded( - self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: - async with datastore_cm as store: - await store.add_task(Task(id='task1', func=asynccontextmanager, max_running_jobs=2)) + async def test_acquire_jobs_max_number_exceeded(self, datastore: AsyncDataStore) -> None: + async with datastore: + await datastore.add_task( + Task(id='task1', func=asynccontextmanager, max_running_jobs=2)) jobs = [Job(task_id='task1'), Job(task_id='task1'), Job(task_id='task1')] for job in jobs: - await store.add_job(job) + await datastore.add_job(job) # Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3 - acquired_jobs = await store.acquire_jobs('worker1', 3) + acquired_jobs = await datastore.acquire_jobs('worker1', 3) assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]] # Release one job, and the worker should be able to acquire the third job - await store.release_job( + await datastore.release_job( 'worker1', acquired_jobs[0].task_id, JobResult(job_id=acquired_jobs[0].id, outcome=JobOutcome.success, return_value=None)) - acquired_jobs = await store.acquire_jobs('worker1', 3) + acquired_jobs = await datastore.acquire_jobs('worker1', 3) assert [job.id for job in acquired_jobs] == [jobs[2].id] -- cgit v1.2.1