summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-21 00:21:25 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-21 00:21:25 +0300
commit8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0 (patch)
treeac96ee8a1cbc792cf56cf3534f5e3c65f0b5a9e7 /tests
parent8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (diff)
downloadapscheduler-8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0.tar.gz
Implemented schedule-level jitter
Structures now keep enums, timedeltas and frozensets as-is. The MongoDB store was modified to use a custom type registry to handle this.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_schedulers.py86
-rw-r--r--tests/test_workers.py34
2 files changed, 106 insertions, 14 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 5f1a7df..240b7bf 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -1,10 +1,14 @@
+import sys
import threading
import time
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
+from typing import Optional
+from uuid import UUID
import anyio
import pytest
from anyio import fail_after
+from pytest_mock import MockerFixture
from apscheduler.enums import JobOutcome
from apscheduler.events import (
@@ -13,6 +17,12 @@ from apscheduler.exceptions import JobLookupError
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
+from apscheduler.triggers.interval import IntervalTrigger
+
+if sys.version_info >= (3, 9):
+ from zoneinfo import ZoneInfo
+else:
+ from backports.zoneinfo import ZoneInfo
pytestmark = pytest.mark.anyio
@@ -83,6 +93,44 @@ class TestAsyncScheduler:
# There should be no more events on the list
assert not received_events
+ @pytest.mark.parametrize('max_jitter, expected_upper_bound', [
+ pytest.param(2, 2, id='within'),
+ pytest.param(4, 2.999999, id='exceed')
+ ])
+ async def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float,
+ expected_upper_bound: float) -> None:
+ def job_added_listener(event: Event) -> None:
+ nonlocal job_id
+ assert isinstance(event, JobAdded)
+ job_id = event.job_id
+ job_added_event.set()
+
+ jitter = 1.569374
+ orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
+ fake_uniform = mocker.patch('random.uniform')
+ fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
+ async with AsyncScheduler(start_worker=False) as scheduler:
+ trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
+ job_added_event = anyio.Event()
+ job_id: Optional[UUID] = None
+ scheduler.events.subscribe(job_added_listener, {JobAdded})
+ schedule_id = await scheduler.add_schedule(dummy_async_job, trigger,
+ max_jitter=max_jitter)
+ schedule = await scheduler.get_schedule(schedule_id)
+ assert schedule.max_jitter == timedelta(seconds=max_jitter)
+
+ # Wait for the job to be added
+ with fail_after(3):
+ await job_added_event.wait()
+
+ fake_uniform.assert_called_once_with(0, expected_upper_bound)
+
+ # Check that the job was created with the proper amount of jitter in its scheduled time
+ jobs = await scheduler.data_store.get_jobs({job_id})
+ assert jobs[0].jitter == timedelta(seconds=jitter)
+ assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter)
+ assert jobs[0].original_scheduled_time == orig_start_time
+
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2})
@@ -165,6 +213,42 @@ class TestSyncScheduler:
# There should be no more events on the list
assert not received_events
+ @pytest.mark.parametrize('max_jitter, expected_upper_bound', [
+ pytest.param(2, 2, id='within'),
+ pytest.param(4, 2.999999, id='exceed')
+ ])
+ def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float,
+ expected_upper_bound: float) -> None:
+ def job_added_listener(event: Event) -> None:
+ nonlocal job_id
+ assert isinstance(event, JobAdded)
+ job_id = event.job_id
+ job_added_event.set()
+
+ jitter = 1.569374
+ orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
+ fake_uniform = mocker.patch('random.uniform')
+ fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
+ with Scheduler(start_worker=False) as scheduler:
+ trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
+ job_added_event = threading.Event()
+ job_id: Optional[UUID] = None
+ scheduler.events.subscribe(job_added_listener, {JobAdded})
+ schedule_id = scheduler.add_schedule(dummy_async_job, trigger, max_jitter=max_jitter)
+ schedule = scheduler.get_schedule(schedule_id)
+ assert schedule.max_jitter == timedelta(seconds=max_jitter)
+
+ # Wait for the job to be added
+ job_added_event.wait(3)
+
+ fake_uniform.assert_called_once_with(0, expected_upper_bound)
+
+ # Check that the job was created with the proper amount of jitter in its scheduled time
+ jobs = scheduler.data_store.get_jobs({job_id})
+ assert jobs[0].jitter == timedelta(seconds=jitter)
+ assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter)
+ assert jobs[0].original_scheduled_time == orig_start_time
+
def test_get_job_result(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job)
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 872cf34..f1f020e 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -77,8 +77,7 @@ class TestAsyncWorker:
received_event = received_events.pop(0)
assert isinstance(received_event, JobAcquired)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id is None
+ assert received_event.worker_id == worker.identity
received_event = received_events.pop(0)
if fail:
@@ -100,7 +99,7 @@ class TestAsyncWorker:
async def test_run_deadline_missed(self) -> None:
def listener(received_event: Event):
received_events.append(received_event)
- if len(received_events) == 4:
+ if len(received_events) == 5:
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
@@ -134,13 +133,18 @@ class TestAsyncWorker:
assert received_event.task_id == 'task_id'
assert received_event.schedule_id == 'foo'
- # Then the deadline was missed
+ # The worker acquired the job
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAcquired)
+ assert received_event.job_id == job.id
+ assert received_event.worker_id == worker.identity
+
+ # The worker determined that the deadline has been missed
received_event = received_events.pop(0)
assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id == 'foo'
+ assert received_event.worker_id == worker.identity
# Finally, the worker was stopped
received_event = received_events.pop(0)
@@ -189,8 +193,7 @@ class TestSyncWorker:
received_event = received_events.pop(0)
assert isinstance(received_event, JobAcquired)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id is None
+ assert received_event.worker_id == worker.identity
received_event = received_events.pop(0)
if fail:
@@ -212,7 +215,7 @@ class TestSyncWorker:
def test_run_deadline_missed(self) -> None:
def listener(worker_event: Event):
received_events.append(worker_event)
- if len(received_events) == 4:
+ if len(received_events) == 5:
event.set()
scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
@@ -227,7 +230,7 @@ class TestSyncWorker:
scheduled_fire_time=scheduled_start_time,
start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc))
worker.data_store.add_job(job)
- event.wait(5)
+ event.wait(3)
# The worker was first started
received_event = received_events.pop(0)
@@ -245,13 +248,18 @@ class TestSyncWorker:
assert received_event.task_id == 'task_id'
assert received_event.schedule_id == 'foo'
- # Then the deadline was missed
+ # The worker acquired the job
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAcquired)
+ assert received_event.job_id == job.id
+ assert received_event.worker_id == worker.identity
+
+ # The worker determined that the deadline has been missed
received_event = received_events.pop(0)
assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id == 'foo'
+ assert received_event.worker_id == worker.identity
# Finally, the worker was stopped
received_event = received_events.pop(0)