diff options
Diffstat (limited to 'tests/test_workers.py')
-rw-r--r-- | tests/test_workers.py | 63 |
1 files changed, 19 insertions, 44 deletions
diff --git a/tests/test_workers.py b/tests/test_workers.py index 6e5568f..e1c7421 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -17,7 +17,6 @@ from apscheduler.events import ( JobAdded, JobReleased, TaskAdded, - WorkerStarted, WorkerStopped, ) from apscheduler.structures import Task @@ -55,26 +54,20 @@ class TestAsyncWorker: ) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() received_events: list[Event] = [] event = anyio.Event() - data_store = MemoryDataStore() - worker = AsyncWorker(data_store) - worker.events.subscribe(listener) - async with worker: + async with AsyncWorker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) await worker.data_store.add_task(Task(id="task_id", func=target_func)) job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) await worker.data_store.add_job(job) with fail_after(3): await event.wait() - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -112,16 +105,14 @@ class TestAsyncWorker: async def test_run_deadline_missed(self) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) received_events: list[Event] = [] event = anyio.Event() - data_store = MemoryDataStore() - worker = AsyncWorker(data_store) - worker.events.subscribe(listener) - async with worker: + async with AsyncWorker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) await worker.data_store.add_task(Task(id="task_id", func=fail_func)) job = Job( task_id="task_id", @@ -133,11 +124,7 @@ class TestAsyncWorker: with fail_after(3): await event.wait() - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -175,25 +162,19 @@ class TestSyncWorker: def test_run_job_nonscheduled(self, fail: bool) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 5: + if isinstance(received_event, JobReleased): event.set() received_events: list[Event] = [] event = threading.Event() - data_store = MemoryDataStore() - worker = Worker(data_store) - worker.events.subscribe(listener) - with worker: + with Worker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) worker.data_store.add_task(Task(id="task_id", func=sync_func)) job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail}) worker.data_store.add_job(job) - event.wait(5) - - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) + event.wait(3) - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" @@ -229,18 +210,16 @@ class TestSyncWorker: assert not received_events def test_run_deadline_missed(self) -> None: - def listener(worker_event: Event): - received_events.append(worker_event) - if len(received_events) == 5: + def listener(received_event: Event): + received_events.append(received_event) + if isinstance(received_event, JobReleased): event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) received_events: list[Event] = [] event = threading.Event() - data_store = MemoryDataStore() - worker = Worker(data_store) - worker.events.subscribe(listener) - with worker: + with Worker(MemoryDataStore()) as worker: + worker.event_broker.subscribe(listener) worker.data_store.add_task(Task(id="task_id", func=fail_func)) job = Job( task_id="task_id", @@ -251,11 +230,7 @@ class TestSyncWorker: worker.data_store.add_job(job) event.wait(3) - # The worker was first started - received_event = received_events.pop(0) - assert isinstance(received_event, WorkerStarted) - - # Then the task was added + # First, a task was added received_event = received_events.pop(0) assert isinstance(received_event, TaskAdded) assert received_event.task_id == "task_id" |