summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
commita9141bb5663e0a22cc7c4da7d34834c925dbadea (patch)
tree533ee4126b22d0b30b6c74106d7b4d3b89328757 /tests
parentffbbbbe0ee147bba08e0c95b3697136590997fb1 (diff)
downloadapscheduler-a9141bb5663e0a22cc7c4da7d34834c925dbadea.tar.gz
Added job expiration times
Scheduled jobs no longer retain their results. All job outcomes are now logged by the workers. Workers, rather than data stores, are now responsible for emitting the JobReleased event.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_datastores.py73
-rw-r--r--tests/test_schedulers.py63
2 files changed, 99 insertions, 37 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 22722c7..40fad27 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import threading
from collections.abc import Generator
from contextlib import asynccontextmanager, contextmanager
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast
@@ -433,7 +433,7 @@ class TestDataStores:
def test_job_release_success(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -443,9 +443,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -460,7 +460,7 @@ class TestDataStores:
def test_job_release_failure(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -470,9 +470,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -488,7 +488,7 @@ class TestDataStores:
def test_job_release_missed_deadline(self, datastore: DataStore):
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -498,7 +498,10 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -511,7 +514,7 @@ class TestDataStores:
def test_job_release_cancelled(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker1", 2)
@@ -521,7 +524,10 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.cancelled,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -576,9 +582,9 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
@@ -890,7 +896,7 @@ class TestAsyncDataStores:
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -900,9 +906,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -917,7 +923,7 @@ class TestAsyncDataStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -927,9 +933,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -945,7 +951,7 @@ class TestAsyncDataStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -955,7 +961,10 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -968,7 +977,7 @@ class TestAsyncDataStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker1", 2)
@@ -978,7 +987,7 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -998,7 +1007,7 @@ class TestAsyncDataStores:
"""
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
# First, one worker acquires the first available job
@@ -1035,9 +1044,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32f28dc..f57cbeb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -17,6 +17,7 @@ from apscheduler import (
JobAdded,
JobLookupError,
JobOutcome,
+ JobReleased,
Schedule,
ScheduleAdded,
ScheduleLookupError,
@@ -181,16 +182,33 @@ class TestAsyncScheduler:
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
+ job_id = await scheduler.add_job(
+ dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5
+ )
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ async def test_get_job_result_success_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ )
+ job_id = await scheduler.add_job(dummy_async_job)
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_error(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(
- dummy_async_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_async_job,
+ kwargs={"delay": 0.2, "fail": True},
+ result_expiration_time=5,
)
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -198,6 +216,17 @@ class TestAsyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ async def test_get_job_result_error_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True)
+ job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_nowait_not_yet_ready(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
@@ -239,6 +268,7 @@ class TestAsyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
await scheduler.data_store.add_job(job)
result = await scheduler.get_job_result(job.id)
@@ -371,17 +401,29 @@ class TestSyncScheduler:
)
assert jobs[0].original_scheduled_time == orig_start_time
- def test_get_job_result(self) -> None:
+ def test_get_job_result_success(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job)
+ job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5)
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ def test_get_job_result_success_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler:
+ with scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job)
+ event.wait(3)
+
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_error(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(
- dummy_sync_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5
)
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -389,6 +431,16 @@ class TestSyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ def test_get_job_result_error_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler, scheduler.event_broker.subscribe(
+ lambda evt: event.set(), one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ event.wait(3)
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
@@ -428,6 +480,7 @@ class TestSyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
scheduler.data_store.add_job(job)
result = scheduler.get_job_result(job.id)