summaryrefslogtreecommitdiff
path: root/tests/test_workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_workers.py')
-rw-r--r--tests/test_workers.py63
1 files changed, 19 insertions, 44 deletions
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 6e5568f..e1c7421 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -17,7 +17,6 @@ from apscheduler.events import (
JobAdded,
JobReleased,
TaskAdded,
- WorkerStarted,
WorkerStopped,
)
from apscheduler.structures import Task
@@ -55,26 +54,20 @@ class TestAsyncWorker:
) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
received_events: list[Event] = []
event = anyio.Event()
- data_store = MemoryDataStore()
- worker = AsyncWorker(data_store)
- worker.events.subscribe(listener)
- async with worker:
+ async with AsyncWorker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
await worker.data_store.add_task(Task(id="task_id", func=target_func))
job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
await worker.data_store.add_job(job)
with fail_after(3):
await event.wait()
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -112,16 +105,14 @@ class TestAsyncWorker:
async def test_run_deadline_missed(self) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
received_events: list[Event] = []
event = anyio.Event()
- data_store = MemoryDataStore()
- worker = AsyncWorker(data_store)
- worker.events.subscribe(listener)
- async with worker:
+ async with AsyncWorker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
await worker.data_store.add_task(Task(id="task_id", func=fail_func))
job = Job(
task_id="task_id",
@@ -133,11 +124,7 @@ class TestAsyncWorker:
with fail_after(3):
await event.wait()
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -175,25 +162,19 @@ class TestSyncWorker:
def test_run_job_nonscheduled(self, fail: bool) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 5:
+ if isinstance(received_event, JobReleased):
event.set()
received_events: list[Event] = []
event = threading.Event()
- data_store = MemoryDataStore()
- worker = Worker(data_store)
- worker.events.subscribe(listener)
- with worker:
+ with Worker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
worker.data_store.add_task(Task(id="task_id", func=sync_func))
job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
worker.data_store.add_job(job)
- event.wait(5)
-
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
+ event.wait(3)
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"
@@ -229,18 +210,16 @@ class TestSyncWorker:
assert not received_events
def test_run_deadline_missed(self) -> None:
- def listener(worker_event: Event):
- received_events.append(worker_event)
- if len(received_events) == 5:
+ def listener(received_event: Event):
+ received_events.append(received_event)
+ if isinstance(received_event, JobReleased):
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
received_events: list[Event] = []
event = threading.Event()
- data_store = MemoryDataStore()
- worker = Worker(data_store)
- worker.events.subscribe(listener)
- with worker:
+ with Worker(MemoryDataStore()) as worker:
+ worker.event_broker.subscribe(listener)
worker.data_store.add_task(Task(id="task_id", func=fail_func))
job = Job(
task_id="task_id",
@@ -251,11 +230,7 @@ class TestSyncWorker:
worker.data_store.add_job(job)
event.wait(3)
- # The worker was first started
- received_event = received_events.pop(0)
- assert isinstance(received_event, WorkerStarted)
-
- # Then the task was added
+ # First, a task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
assert received_event.task_id == "task_id"