diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-31 09:04:57 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad (patch) | |
tree | 848514069cb09aef702ef320a2697dcb1dd3897e /tests/test_datastores.py | |
parent | ff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5 (diff) | |
download | apscheduler-dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad.tar.gz |
Added preliminary support for job result reporting and job cancellation
Diffstat (limited to 'tests/test_datastores.py')
-rw-r--r-- | tests/test_datastores.py | 100 |
1 files changed, 85 insertions, 15 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index e0a9d26..ef86429 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -9,8 +9,10 @@ import pytest from freezegun.api import FrozenDateTimeFactory from apscheduler.abc import AsyncDataStore, Job, Schedule -from apscheduler.events import Event, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated +from apscheduler.enums import JobOutcome +from apscheduler.events import Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated from apscheduler.policies import CoalescePolicy, ConflictPolicy +from apscheduler.structures import JobResult from apscheduler.triggers.date import DateTrigger @@ -196,9 +198,9 @@ class TestAsyncStores: # assert len(acquired3) == 1 # assert acquired3[0].id == 's1' - async def test_acquire_release_jobs(self, datastore_cm: AsyncContextManager[AsyncDataStore], - jobs: List[Job]) -> None: - async with datastore_cm as store, capture_events(store, 0) as events: + async def test_acquire_release_multiple_workers( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]) -> None: + async with datastore_cm as store: for job in jobs: await store.add_job(job) @@ -213,21 +215,89 @@ class TestAsyncStores: assert jobs2[0].id == jobs[1].id # The third worker gets nothing - assert not await store.acquire_jobs('dummy-id3', 1) + jobs3 = await store.acquire_jobs('dummy-id3', 1) + assert not jobs3 + + async def test_job_release_success(self, datastore_cm: AsyncContextManager[AsyncDataStore], + jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id + + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.success, return_value='foo')) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == 'foo' + + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_failure(self, datastore_cm: AsyncContextManager[AsyncDataStore], + jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id - # All the jobs should still be returned - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 2 + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.failure, exception=ValueError('foo'))) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.failure + assert isinstance(result.exception, ValueError) + assert result.exception.args == ('foo',) + assert result.return_value is None - await store.release_jobs('dummy-id1', jobs1) - await store.release_jobs('dummy-id2', jobs2) + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_missed_deadline( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id + + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.missed_start_deadline)) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_cancelled( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id - # All the jobs should be gone - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 0 + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.cancelled)) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None - # Check for the appropriate events - assert all(isinstance(event, JobAdded) for event in events) + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) async def test_acquire_jobs_lock_timeout( self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job], |