From 8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 21 Sep 2021 00:21:25 +0300 Subject: Implemented schedule-level jitter Structures now keep enums, timedeltas and frozensets as-is. The MongoDB store was modified to use a custom type registry to handle this. --- tests/test_schedulers.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++- tests/test_workers.py | 34 +++++++++++-------- 2 files changed, 106 insertions(+), 14 deletions(-) (limited to 'tests') diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 5f1a7df..240b7bf 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,10 +1,14 @@ +import sys import threading import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone +from typing import Optional +from uuid import UUID import anyio import pytest from anyio import fail_after +from pytest_mock import MockerFixture from apscheduler.enums import JobOutcome from apscheduler.events import ( @@ -13,6 +17,12 @@ from apscheduler.exceptions import JobLookupError from apscheduler.schedulers.async_ import AsyncScheduler from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger + +if sys.version_info >= (3, 9): + from zoneinfo import ZoneInfo +else: + from backports.zoneinfo import ZoneInfo pytestmark = pytest.mark.anyio @@ -83,6 +93,44 @@ class TestAsyncScheduler: # There should be no more events on the list assert not received_events + @pytest.mark.parametrize('max_jitter, expected_upper_bound', [ + pytest.param(2, 2, id='within'), + pytest.param(4, 2.999999, id='exceed') + ]) + async def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float, + expected_upper_bound: float) -> None: + def job_added_listener(event: Event) -> None: + nonlocal job_id + assert isinstance(event, JobAdded) + job_id = event.job_id + job_added_event.set() + + jitter = 1.569374 + orig_start_time = datetime.now(timezone) - timedelta(seconds=1) + fake_uniform = mocker.patch('random.uniform') + fake_uniform.configure_mock(side_effect=lambda a, b: jitter) + async with AsyncScheduler(start_worker=False) as scheduler: + trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) + job_added_event = anyio.Event() + job_id: Optional[UUID] = None + scheduler.events.subscribe(job_added_listener, {JobAdded}) + schedule_id = await scheduler.add_schedule(dummy_async_job, trigger, + max_jitter=max_jitter) + schedule = await scheduler.get_schedule(schedule_id) + assert schedule.max_jitter == timedelta(seconds=max_jitter) + + # Wait for the job to be added + with fail_after(3): + await job_added_event.wait() + + fake_uniform.assert_called_once_with(0, expected_upper_bound) + + # Check that the job was created with the proper amount of jitter in its scheduled time + jobs = await scheduler.data_store.get_jobs({job_id}) + assert jobs[0].jitter == timedelta(seconds=jitter) + assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter) + assert jobs[0].original_scheduled_time == orig_start_time + async def test_get_job_result_success(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2}) @@ -165,6 +213,42 @@ class TestSyncScheduler: # There should be no more events on the list assert not received_events + @pytest.mark.parametrize('max_jitter, expected_upper_bound', [ + pytest.param(2, 2, id='within'), + pytest.param(4, 2.999999, id='exceed') + ]) + def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float, + expected_upper_bound: float) -> None: + def job_added_listener(event: Event) -> None: + nonlocal job_id + assert isinstance(event, JobAdded) + job_id = event.job_id + job_added_event.set() + + jitter = 1.569374 + orig_start_time = datetime.now(timezone) - timedelta(seconds=1) + fake_uniform = mocker.patch('random.uniform') + fake_uniform.configure_mock(side_effect=lambda a, b: jitter) + with Scheduler(start_worker=False) as scheduler: + trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) + job_added_event = threading.Event() + job_id: Optional[UUID] = None + scheduler.events.subscribe(job_added_listener, {JobAdded}) + schedule_id = scheduler.add_schedule(dummy_async_job, trigger, max_jitter=max_jitter) + schedule = scheduler.get_schedule(schedule_id) + assert schedule.max_jitter == timedelta(seconds=max_jitter) + + # Wait for the job to be added + job_added_event.wait(3) + + fake_uniform.assert_called_once_with(0, expected_upper_bound) + + # Check that the job was created with the proper amount of jitter in its scheduled time + jobs = scheduler.data_store.get_jobs({job_id}) + assert jobs[0].jitter == timedelta(seconds=jitter) + assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter) + assert jobs[0].original_scheduled_time == orig_start_time + def test_get_job_result(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job) diff --git a/tests/test_workers.py b/tests/test_workers.py index 872cf34..f1f020e 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -77,8 +77,7 @@ class TestAsyncWorker: received_event = received_events.pop(0) assert isinstance(received_event, JobAcquired) assert received_event.job_id == job.id - assert received_event.task_id == 'task_id' - assert received_event.schedule_id is None + assert received_event.worker_id == worker.identity received_event = received_events.pop(0) if fail: @@ -100,7 +99,7 @@ class TestAsyncWorker: async def test_run_deadline_missed(self) -> None: def listener(received_event: Event): received_events.append(received_event) - if len(received_events) == 4: + if len(received_events) == 5: event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) @@ -134,13 +133,18 @@ class TestAsyncWorker: assert received_event.task_id == 'task_id' assert received_event.schedule_id == 'foo' - # Then the deadline was missed + # The worker acquired the job + received_event = received_events.pop(0) + assert isinstance(received_event, JobAcquired) + assert received_event.job_id == job.id + assert received_event.worker_id == worker.identity + + # The worker determined that the deadline has been missed received_event = received_events.pop(0) assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id - assert received_event.task_id == 'task_id' - assert received_event.schedule_id == 'foo' + assert received_event.worker_id == worker.identity # Finally, the worker was stopped received_event = received_events.pop(0) @@ -189,8 +193,7 @@ class TestSyncWorker: received_event = received_events.pop(0) assert isinstance(received_event, JobAcquired) assert received_event.job_id == job.id - assert received_event.task_id == 'task_id' - assert received_event.schedule_id is None + assert received_event.worker_id == worker.identity received_event = received_events.pop(0) if fail: @@ -212,7 +215,7 @@ class TestSyncWorker: def test_run_deadline_missed(self) -> None: def listener(worker_event: Event): received_events.append(worker_event) - if len(received_events) == 4: + if len(received_events) == 5: event.set() scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) @@ -227,7 +230,7 @@ class TestSyncWorker: scheduled_fire_time=scheduled_start_time, start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc)) worker.data_store.add_job(job) - event.wait(5) + event.wait(3) # The worker was first started received_event = received_events.pop(0) @@ -245,13 +248,18 @@ class TestSyncWorker: assert received_event.task_id == 'task_id' assert received_event.schedule_id == 'foo' - # Then the deadline was missed + # The worker acquired the job + received_event = received_events.pop(0) + assert isinstance(received_event, JobAcquired) + assert received_event.job_id == job.id + assert received_event.worker_id == worker.identity + + # The worker determined that the deadline has been missed received_event = received_events.pop(0) assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id - assert received_event.task_id == 'task_id' - assert received_event.schedule_id == 'foo' + assert received_event.worker_id == worker.identity # Finally, the worker was stopped received_event = received_events.pop(0) -- cgit v1.2.1