diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-29 01:02:10 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-29 01:38:09 +0300 |
commit | b4d4724e95583b9f075a814319c3d5e8e5514a3e (patch) | |
tree | dd77fb25ded2ceb5a4f29221de69f19f469cfac0 /tests | |
parent | cf77aec5326e42af7b89e4ab2712daf9694ebad9 (diff) | |
download | apscheduler-b4d4724e95583b9f075a814319c3d5e8e5514a3e.tar.gz |
Overhauled the data store and event dispatch systems
Diffstat (limited to 'tests')
-rw-r--r-- | tests/conftest.py | 200 | ||||
-rw-r--r-- | tests/test_datastores.py | 412 | ||||
-rw-r--r-- | tests/test_events.py | 134 | ||||
-rw-r--r-- | tests/test_schedulers.py | 123 | ||||
-rw-r--r-- | tests/test_workers.py | 282 |
5 files changed, 729 insertions, 422 deletions
diff --git a/tests/conftest.py b/tests/conftest.py index bf6b975..c242b8d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,11 @@ import sys -from contextlib import AsyncExitStack, ExitStack -from functools import partial +from contextlib import asynccontextmanager, contextmanager +from typing import AsyncContextManager, AsyncGenerator, ContextManager, Generator, Optional import pytest -from anyio import start_blocking_portal -from apscheduler.datastores.memory import MemoryDataStore +from apscheduler.abc import AsyncDataStore, DataStore, Serializer +from apscheduler.adapters import AsyncDataStoreAdapter +from apscheduler.datastores.sync.memory import MemoryDataStore from apscheduler.serializers.cbor import CBORSerializer from apscheduler.serializers.json import JSONSerializer from apscheduler.serializers.pickle import PickleSerializer @@ -14,94 +15,135 @@ if sys.version_info >= (3, 9): else: from backports.zoneinfo import ZoneInfo -try: - from apscheduler.datastores.mongodb import MongoDBDataStore - from motor.motor_asyncio import AsyncIOMotorClient -except ImportError: - MongoDBDataStore = None - -try: - from apscheduler.datastores.postgresql import PostgresqlDataStore - from asyncpg import create_pool -except ImportError: - PostgresqlDataStore = None - -store_params = [ - pytest.param(MemoryDataStore, id='memory'), - pytest.param(PostgresqlDataStore, id='postgresql'), - pytest.param(MongoDBDataStore, id='mongodb') -] - @pytest.fixture(scope='session') -def timezone(): +def timezone() -> ZoneInfo: return ZoneInfo('Europe/Berlin') -@pytest.fixture(params=[None, PickleSerializer, CBORSerializer, JSONSerializer], - ids=['none', 'pickle', 'cbor', 'json']) -def serializer(request): +@pytest.fixture(params=[ + pytest.param(None, id='none'), + pytest.param(PickleSerializer, id='pickle'), + pytest.param(CBORSerializer, id='cbor'), + pytest.param(JSONSerializer, id='json') +]) +def serializer(request) -> Optional[Serializer]: return request.param() if request.param else None @pytest.fixture -def anyio_backend(): +def anyio_backend() -> 'str': return 'asyncio' -@pytest.fixture(params=store_params) -async def store(request): - async with AsyncExitStack() as stack: - if request.param is PostgresqlDataStore: - if PostgresqlDataStore is None: - pytest.skip('asyncpg not installed') - - pool = await create_pool('postgresql://postgres:secret@localhost/testdb', - min_size=1, max_size=2) - await stack.enter_async_context(pool) - store = PostgresqlDataStore(pool, start_from_scratch=True) - elif request.param is MongoDBDataStore: - if MongoDBDataStore is None: - pytest.skip('motor not installed') - - client = AsyncIOMotorClient(tz_aware=True) - stack.push(lambda *args: client.close()) - store = MongoDBDataStore(client, start_from_scratch=True) - else: - store = MemoryDataStore() - - await stack.enter_async_context(store) +@contextmanager +def setup_mongodb_store() -> Generator[DataStore, None, None]: + from apscheduler.datastores.sync.mongodb import MongoDBDataStore + from pymongo import MongoClient + from pymongo.errors import ConnectionFailure + + client = MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) + try: + client.admin.command('ismaster') + except ConnectionFailure: + pytest.skip('MongoDB server not available') + raise + + store = MongoDBDataStore(client, start_from_scratch=True) + with client, store: yield store -@pytest.fixture -def portal(): - with start_blocking_portal() as portal: - yield portal - - -@pytest.fixture(params=store_params) -def sync_store(request, portal): - with ExitStack() as stack: - if request.param is PostgresqlDataStore: - if PostgresqlDataStore is None: - pytest.skip('asyncpg not installed') - - pool = portal.call( - partial(create_pool, 'postgresql://postgres:secret@localhost/testdb', - min_size=1, max_size=2) - ) - stack.enter_context(portal.wrap_async_context_manager(pool)) - store = PostgresqlDataStore(pool, start_from_scratch=True) - elif request.param is MongoDBDataStore: - if MongoDBDataStore is None: - pytest.skip('motor not installed') - - client = portal.call(partial(AsyncIOMotorClient, tz_aware=True)) - stack.push(lambda *args: portal.call(client.close)) - store = MongoDBDataStore(client, start_from_scratch=True) - else: - store = MemoryDataStore() - - stack.enter_context(portal.wrap_async_context_manager(store)) +@contextmanager +def setup_memory_store() -> Generator[DataStore, None, None]: + with MemoryDataStore() as store: yield store + + +@asynccontextmanager +async def setup_postgresql_store() -> AsyncGenerator[AsyncDataStore, None]: + try: + from apscheduler.datastores.async_.postgresql import PostgresqlDataStore + from asyncpg import create_pool + except ModuleNotFoundError: + pytest.skip('asyncpg not installed') + raise + + pool = await create_pool('postgresql://postgres:secret@localhost/testdb', + min_size=1, max_size=2) + store = PostgresqlDataStore(pool, start_from_scratch=True) + async with pool, store: + yield store + + +@contextmanager +def setup_sqlalchemy_store() -> Generator[DataStore, None, None]: + try: + from apscheduler.datastores.sync.sqlalchemy import SQLAlchemyDataStore + from sqlalchemy import create_engine + except ModuleNotFoundError: + pytest.skip('sqlalchemy not installed') + raise + + engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb', future=True) + store = SQLAlchemyDataStore(engine, start_from_scratch=True) + try: + with store: + yield store + finally: + engine.dispose() + + +@asynccontextmanager +async def setup_async_sqlalchemy_store() -> AsyncGenerator[AsyncDataStore, None]: + try: + from apscheduler.datastores.async_.sqlalchemy import SQLAlchemyDataStore + from sqlalchemy.ext.asyncio import create_async_engine + except ModuleNotFoundError: + pytest.skip('sqlalchemy not installed') + raise + + engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb', + future=True) + store = SQLAlchemyDataStore(engine, start_from_scratch=True) + try: + async with store: + yield store + finally: + await engine.dispose() + + +@pytest.fixture(params=[ + pytest.param(setup_memory_store, id='memory'), + pytest.param(setup_mongodb_store, id='mongodb') +]) +def setup_sync_store(request) -> ContextManager[DataStore]: + return request.param + + +@pytest.fixture(params=[ + pytest.param(setup_postgresql_store, id='postgresql'), + pytest.param(setup_async_sqlalchemy_store, id='async_sqlalchemy') +]) +def setup_async_store(request) -> AsyncContextManager[AsyncDataStore]: + return request.param + + +@pytest.fixture(params=[ + pytest.param(setup_memory_store, id='memory'), + pytest.param(setup_mongodb_store, id='mongodb'), + pytest.param(setup_postgresql_store, id='postgresql'), + pytest.param(setup_async_sqlalchemy_store, id='async_sqlalchemy') +]) +def datastore_cm(request): + cm = request.param() + if isinstance(cm, AsyncContextManager): + return cm + + @asynccontextmanager + async def wrapper(): + with cm as store: + async with AsyncDataStoreAdapter(store) as adapter: + yield adapter + + return wrapper() diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 0e4fd68..0f10c61 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,27 +1,18 @@ +from __future__ import annotations + from datetime import datetime, timezone +from typing import AsyncContextManager, List import pytest -from anyio import move_on_after -from apscheduler.abc import Job, Schedule +from apscheduler.abc import AsyncDataStore, Job, Schedule from apscheduler.events import ScheduleAdded, ScheduleRemoved, ScheduleUpdated from apscheduler.policies import CoalescePolicy, ConflictPolicy from apscheduler.triggers.date import DateTrigger - -pytestmark = [ - pytest.mark.anyio, - pytest.mark.parametrize('anyio_backend', ['asyncio']) -] - - -@pytest.fixture -async def events(store): - events = [] - store.subscribe(events.append) - return events +from freezegun.api import FrozenDateTimeFactory @pytest.fixture -def schedules(): +def schedules() -> List[Schedule]: trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) schedule1 = Schedule(id='s1', task_id='bogus', trigger=trigger, args=(), kwargs={}, coalesce=CoalescePolicy.latest, misfire_grace_time=None, tags=frozenset()) @@ -39,7 +30,7 @@ def schedules(): @pytest.fixture -def jobs(): +def jobs() -> List[Job]: job1 = Job('task1', print, ('hello',), {'arg2': 'world'}, 'schedule1', datetime(2020, 10, 10, tzinfo=timezone.utc), datetime(2020, 10, 10, 1, tzinfo=timezone.utc), frozenset()) @@ -48,197 +39,198 @@ def jobs(): return [job1, job2] -async def test_add_schedules(store, schedules, events): - for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) - - assert await store.get_schedules() == schedules - assert await store.get_schedules({'s1', 's2', 's3'}) == schedules - assert await store.get_schedules({'s1'}) == [schedules[0]] - assert await store.get_schedules({'s2'}) == [schedules[1]] - assert await store.get_schedules({'s3'}) == [schedules[2]] - - assert len(events) == 3 - for event, schedule in zip(events, schedules): - assert isinstance(event, ScheduleAdded) - assert event.schedule_id == schedule.id - assert event.next_fire_time == schedule.next_fire_time - - -async def test_replace_schedules(store, schedules, events): - for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) - - events.clear() - next_fire_time = schedules[2].trigger.next() - schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(), - kwargs={}, coalesce=CoalescePolicy.earliest, misfire_grace_time=None, - tags=frozenset()) - schedule.next_fire_time = next_fire_time - await store.add_schedule(schedule, ConflictPolicy.replace) - - schedules = await store.get_schedules({schedule.id}) - assert schedules[0].task_id == 'foo' - assert schedules[0].next_fire_time == next_fire_time - assert schedules[0].args == () - assert schedules[0].kwargs == {} - assert schedules[0].coalesce is CoalescePolicy.earliest - assert schedules[0].misfire_grace_time is None - assert schedules[0].tags == frozenset() - - assert len(events) == 1 - assert isinstance(events[0], ScheduleUpdated) - assert events[0].schedule_id == 's3' - assert events[0].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) - - -async def test_remove_schedules(store, schedules, events): - for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) - - events.clear() - await store.remove_schedules(['s1', 's2']) - - assert len(events) == 2 - assert isinstance(events[0], ScheduleRemoved) - assert events[0].schedule_id == 's1' - assert isinstance(events[1], ScheduleRemoved) - assert events[1].schedule_id == 's2' - - assert await store.get_schedules() == [schedules[2]] - - -@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) -async def test_acquire_release_schedules(store, schedules, events): - for schedule in schedules: - await store.add_schedule(schedule, ConflictPolicy.exception) - - events.clear() - - # The first scheduler gets the first due schedule - schedules1 = await store.acquire_schedules('dummy-id1', 1) - assert len(schedules1) == 1 - assert schedules1[0].id == 's1' - - # The second scheduler gets the second due schedule - schedules2 = await store.acquire_schedules('dummy-id2', 1) - assert len(schedules2) == 1 - assert schedules2[0].id == 's2' - - # The third scheduler gets nothing - async with move_on_after(0.2): - await store.acquire_schedules('dummy-id3', 1) - pytest.fail('The call should have timed out') - - # The schedules here have run their course, and releasing them should delete them - schedules1[0].next_fire_time = None - schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) - await store.release_schedules('dummy-id1', schedules1) - await store.release_schedules('dummy-id2', schedules2) - - # Check that the first schedule is gone - schedules = await store.get_schedules() - assert len(schedules) == 2 - assert schedules[0].id == 's2' - assert schedules[1].id == 's3' - - # Check for the appropriate update and delete events - assert len(events) == 2 - assert isinstance(events[0], ScheduleRemoved) - assert isinstance(events[1], ScheduleUpdated) - assert events[0].schedule_id == 's1' - assert events[1].schedule_id == 's2' - assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) - - -async def test_acquire_schedules_lock_timeout(store, schedules, events, freezer): - """ - Test that a scheduler can acquire schedules that were acquired by another scheduler but not - released within the lock timeout period. - - """ - # First, one scheduler acquires the first available schedule - await store.add_schedule(schedules[0], ConflictPolicy.exception) - acquired = await store.acquire_schedules('dummy-id1', 1) - assert len(acquired) == 1 - assert acquired[0].id == 's1' - - # Try to acquire the schedule just at the threshold (now == acquired_until). - # This should not yield any schedules. - freezer.tick(30) - async with move_on_after(0.2): - await store.acquire_schedules('dummy-id2', 1) - pytest.fail('The call should have timed out') - - # Right after that, the schedule should be available - freezer.tick(1) - acquired = await store.acquire_schedules('dummy-id2', 1) - assert len(acquired) == 1 - assert acquired[0].id == 's1' - - -async def test_acquire_release_jobs(store, jobs, events): - for job in jobs: - await store.add_job(job) - - events.clear() - - # The first worker gets the first job in the queue - jobs1 = await store.acquire_jobs('dummy-id1', 1) - assert len(jobs1) == 1 - assert jobs1[0].id == jobs[0].id - - # The second worker gets the second job - jobs2 = await store.acquire_jobs('dummy-id2', 1) - assert len(jobs2) == 1 - assert jobs2[0].id == jobs[1].id - - # The third worker gets nothing - async with move_on_after(0.2): - await store.acquire_jobs('dummy-id3', 1) - pytest.fail('The call should have timed out') - - # All the jobs should still be returned - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 2 - - await store.release_jobs('dummy-id1', jobs1) - await store.release_jobs('dummy-id2', jobs2) - - # All the jobs should be gone - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 0 - - # Check for the appropriate update and delete events - # assert len(events) == 2 - # assert isinstance(events[0], Job) - # assert isinstance(events[1], SchedulesUpdated) - # assert events[0].schedule_ids == {'s1'} - # assert events[1].schedule_ids == {'s2'} - # assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) - - -async def test_acquire_jobs_lock_timeout(store, jobs, events, freezer): - """ - Test that a worker can acquire jobs that were acquired by another scheduler but not - released within the lock timeout period. - - """ - # First, one worker acquires the first available job - await store.add_job(jobs[0]) - acquired = await store.acquire_jobs('dummy-id1', 1) - assert len(acquired) == 1 - assert acquired[0].id == jobs[0].id - - # Try to acquire the job just at the threshold (now == acquired_until). - # This should not yield any jobs. - freezer.tick(30) - async with move_on_after(0.2): - await store.acquire_jobs('dummy-id2', 1) - pytest.fail('The call should have timed out') - - # Right after that, the job should be available - freezer.tick(1) - acquired = await store.acquire_jobs('dummy-id2', 1) - assert len(acquired) == 1 - assert acquired[0].id == jobs[0].id +@pytest.mark.anyio +class TestAsyncStores: + async def test_add_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + schedules: List[Schedule]) -> None: + events = [] + async with datastore_cm as store: + store.subscribe(events.append, [ScheduleAdded]) + for schedule in schedules: + await store.add_schedule(schedule, ConflictPolicy.exception) + + assert await store.get_schedules() == schedules + assert await store.get_schedules({'s1', 's2', 's3'}) == schedules + assert await store.get_schedules({'s1'}) == [schedules[0]] + assert await store.get_schedules({'s2'}) == [schedules[1]] + assert await store.get_schedules({'s3'}) == [schedules[2]] + + assert len(events) == 3 + add_events = [event for event in events if isinstance(event, ScheduleAdded)] + for event, schedule in zip(add_events, schedules): + assert event.schedule_id == schedule.id + assert event.next_fire_time == schedule.next_fire_time + + async def test_replace_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + schedules: List[Schedule]) -> None: + async with datastore_cm as store: + for schedule in schedules: + await store.add_schedule(schedule, ConflictPolicy.exception) + + events = [] + store.subscribe(events.append) + next_fire_time = schedules[2].trigger.next() + schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(), + kwargs={}, coalesce=CoalescePolicy.earliest, + misfire_grace_time=None, tags=frozenset()) + schedule.next_fire_time = next_fire_time + await store.add_schedule(schedule, ConflictPolicy.replace) + + schedules = await store.get_schedules({schedule.id}) + assert schedules[0].task_id == 'foo' + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert schedules[0].coalesce is CoalescePolicy.earliest + assert schedules[0].misfire_grace_time is None + assert schedules[0].tags == frozenset() + + assert len(events) == 1 + assert isinstance(events[0], ScheduleUpdated) + assert events[0].schedule_id == 's3' + assert events[0].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + + async def test_remove_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore], + schedules: List[Schedule]) -> None: + events = [] + async with datastore_cm as store: + for schedule in schedules: + await store.add_schedule(schedule, ConflictPolicy.exception) + + store.subscribe(events.append) + await store.remove_schedules(['s1', 's2']) + assert await store.get_schedules() == [schedules[2]] + + assert len(events) == 2 + assert isinstance(events[0], ScheduleRemoved) + assert events[0].schedule_id == 's1' + assert isinstance(events[1], ScheduleRemoved) + assert events[1].schedule_id == 's2' + + @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc)) + async def test_acquire_release_schedules( + self, datastore_cm, schedules: List[Schedule]) -> None: + events = [] + async with datastore_cm as store: + for schedule in schedules: + await store.add_schedule(schedule, ConflictPolicy.exception) + + # The first scheduler gets the first due schedule + store.subscribe(events.append) + schedules1 = await store.acquire_schedules('dummy-id1', 1) + assert len(schedules1) == 1 + assert schedules1[0].id == 's1' + + # The second scheduler gets the second due schedule + schedules2 = await store.acquire_schedules('dummy-id2', 1) + assert len(schedules2) == 1 + assert schedules2[0].id == 's2' + + # The third scheduler gets nothing + assert not await store.acquire_schedules('dummy-id3', 1) + + # The schedules here have run their course, and releasing them should delete them + schedules1[0].next_fire_time = None + schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc) + await store.release_schedules('dummy-id1', schedules1) + await store.release_schedules('dummy-id2', schedules2) + + # Check that the first schedule is gone + schedules = await store.get_schedules() + assert len(schedules) == 2 + assert schedules[0].id == 's2' + assert schedules[1].id == 's3' + + # Check for the appropriate update and delete events + assert len(events) == 2 + assert isinstance(events[0], ScheduleRemoved) + assert isinstance(events[1], ScheduleUpdated) + assert events[0].schedule_id == 's1' + assert events[1].schedule_id == 's2' + assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + + async def test_acquire_schedules_lock_timeout( + self, datastore_cm, schedules: List[Schedule], freezer) -> None: + """ + Test that a scheduler can acquire schedules that were acquired by another scheduler but not + released within the lock timeout period. + + """ + async with datastore_cm as store: + # First, one scheduler acquires the first available schedule + await store.add_schedule(schedules[0], ConflictPolicy.exception) + acquired = await store.acquire_schedules('dummy-id1', 1) + assert len(acquired) == 1 + assert acquired[0].id == 's1' + + # Try to acquire the schedule just at the threshold (now == acquired_until). + # This should not yield any schedules. + freezer.tick(30) + assert not await store.acquire_schedules('dummy-id2', 1) + + # Right after that, the schedule should be available + freezer.tick(1) + acquired = await store.acquire_schedules('dummy-id2', 1) + assert len(acquired) == 1 + assert acquired[0].id == 's1' + + async def test_acquire_release_jobs(self, datastore_cm: AsyncContextManager[AsyncDataStore], + jobs: List[Job]) -> None: + events = [] + async with datastore_cm as store: + for job in jobs: + await store.add_job(job) + + # The first worker gets the first job in the queue + store.subscribe(events.append) + jobs1 = await store.acquire_jobs('dummy-id1', 1) + assert len(jobs1) == 1 + assert jobs1[0].id == jobs[0].id + + # The second worker gets the second job + jobs2 = await store.acquire_jobs('dummy-id2', 1) + assert len(jobs2) == 1 + assert jobs2[0].id == jobs[1].id + + # The third worker gets nothing + assert not await store.acquire_jobs('dummy-id3', 1) + + # All the jobs should still be returned + visible_jobs = await store.get_jobs() + assert len(visible_jobs) == 2 + + await store.release_jobs('dummy-id1', jobs1) + await store.release_jobs('dummy-id2', jobs2) + + # All the jobs should be gone + visible_jobs = await store.get_jobs() + assert len(visible_jobs) == 0 + + # Check for the appropriate events + assert not events + + async def test_acquire_jobs_lock_timeout( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job], + freezer: FrozenDateTimeFactory) -> None: + """ + Test that a worker can acquire jobs that were acquired by another scheduler but not + released within the lock timeout period. + + """ + async with datastore_cm as store: + # First, one worker acquires the first available job + await store.add_job(jobs[0]) + acquired = await store.acquire_jobs('dummy-id1', 1) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id + + # Try to acquire the job just at the threshold (now == acquired_until). + # This should not yield any jobs. + freezer.tick(30) + assert not await store.acquire_jobs('dummy-id2', 1) + + # Right after that, the job should be available + freezer.tick(1) + acquired = await store.acquire_jobs('dummy-id2', 1) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..3c9ebd6 --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,134 @@ +from datetime import datetime, timezone +from functools import partial +from operator import setitem +from typing import List, Optional + +import pytest +from _pytest.logging import LogCaptureFixture +from apscheduler.events import AsyncEventHub, Event, EventHub + + +class TestEventHub: + def test_publish(self) -> None: + timestamp = datetime.now(timezone.utc) + events: List[Optional[Event]] = [None, None] + with EventHub() as eventhub: + eventhub.subscribe(partial(setitem, events, 0)) + eventhub.subscribe(partial(setitem, events, 1)) + eventhub.publish(Event(timestamp=timestamp)) + + assert events[0] is events[1] + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + + def test_unsubscribe(self) -> None: + timestamp = datetime.now(timezone.utc) + events = [] + with EventHub() as eventhub: + token = eventhub.subscribe(events.append) + eventhub.publish(Event(timestamp=timestamp)) + eventhub.unsubscribe(token) + eventhub.publish(Event(timestamp=timestamp)) + + assert len(events) == 1 + + def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: + with EventHub() as eventhub: + eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) + + assert not caplog.text + + def test_publish_exception(self, caplog: LogCaptureFixture) -> None: + def bad_subscriber(event: Event) -> None: + raise Exception('foo') + + timestamp = datetime.now(timezone.utc) + events = [] + with EventHub() as eventhub: + eventhub.subscribe(bad_subscriber) + eventhub.subscribe(events.append) + eventhub.publish(Event(timestamp=timestamp)) + + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + assert 'Error delivering Event' in caplog.text + + def test_subscribe_coroutine_callback(self) -> None: + async def callback(event: Event) -> None: + pass + + with EventHub() as eventhub: + with pytest.raises(ValueError, match='Coroutine functions are not supported'): + eventhub.subscribe(callback) + + def test_relay_events(self) -> None: + timestamp = datetime.now(timezone.utc) + events = [] + with EventHub() as eventhub1, EventHub() as eventhub2: + eventhub2.relay_events_from(eventhub1) + eventhub2.subscribe(events.append) + eventhub1.publish(Event(timestamp=timestamp)) + + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + + +@pytest.mark.anyio +class TestAsyncEventHub: + async def test_publish(self) -> None: + async def async_setitem(event: Event) -> None: + events[1] = event + + timestamp = datetime.now(timezone.utc) + events: List[Optional[Event]] = [None, None] + async with AsyncEventHub() as eventhub: + eventhub.subscribe(partial(setitem, events, 0)) + eventhub.subscribe(async_setitem) + eventhub.publish(Event(timestamp=timestamp)) + + assert events[0] is events[1] + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + + async def test_unsubscribe(self) -> None: + timestamp = datetime.now(timezone.utc) + events = [] + async with AsyncEventHub() as eventhub: + token = eventhub.subscribe(events.append) + eventhub.publish(Event(timestamp=timestamp)) + eventhub.unsubscribe(token) + eventhub.publish(Event(timestamp=timestamp)) + + assert len(events) == 1 + + async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None: + async with AsyncEventHub() as eventhub: + eventhub.publish(Event(timestamp=datetime.now(timezone.utc))) + + assert not caplog.text + + async def test_publish_exception(self, caplog: LogCaptureFixture) -> None: + def bad_subscriber(event: Event) -> None: + raise Exception('foo') + + timestamp = datetime.now(timezone.utc) + events = [] + async with AsyncEventHub() as eventhub: + eventhub.subscribe(bad_subscriber) + eventhub.subscribe(events.append) + eventhub.publish(Event(timestamp=timestamp)) + + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp + assert 'Error delivering Event' in caplog.text + + async def test_relay_events(self) -> None: + timestamp = datetime.now(timezone.utc) + events = [] + async with AsyncEventHub() as eventhub1, AsyncEventHub() as eventhub2: + eventhub1.relay_events_from(eventhub2) + eventhub1.subscribe(events.append) + eventhub2.publish(Event(timestamp=timestamp)) + + assert isinstance(events[0], Event) + assert events[0].timestamp == timestamp diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 3e125c0..5829f30 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,10 +1,14 @@ -import logging +import threading from datetime import datetime, timezone +from typing import List +import anyio import pytest -from apscheduler.events import JobSuccessful +from anyio import fail_after +from apscheduler.events import ( + Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped) from apscheduler.schedulers.async_ import AsyncScheduler -from apscheduler.schedulers.sync import SyncScheduler +from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger pytestmark = pytest.mark.anyio @@ -19,40 +23,91 @@ def dummy_sync_job(): class TestAsyncScheduler: - async def test_schedule_job(self, caplog, store): - async def listener(event): - events.append(event) - if isinstance(event, JobSuccessful): - await scheduler.stop() + async def test_schedule_job(self) -> None: + def listener(received_event: Event) -> None: + received_events.append(received_event) + if len(received_events) == 4: + event.set() - caplog.set_level(logging.DEBUG) + received_events: List[Event] = [] + event = anyio.Event() + scheduler = AsyncScheduler(start_worker=False) + scheduler.subscribe(listener) trigger = DateTrigger(datetime.now(timezone.utc)) - events = [] - async with AsyncScheduler(store) as scheduler: - scheduler.worker.subscribe(listener) - await scheduler.add_schedule(dummy_async_job, trigger) - await scheduler.wait_until_stopped() + async with scheduler: + await scheduler.add_schedule(dummy_async_job, trigger, id='foo') + with fail_after(3): + await event.wait() - assert len(events) == 2 - assert isinstance(events[1], JobSuccessful) - assert events[1].return_value == 'returnvalue' + # The scheduler was first started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + + # Then a schedule was added + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'foo' + # assert received_event.task_id == 'task_id' + + # Then that schedule was processed and a job was added for it + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.schedule_id == 'foo' + assert received_event.task_id == 'test_schedulers:dummy_async_job' + + # Then the schedule was removed since the trigger had been exhausted + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == 'foo' + + # Finally, the scheduler was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStopped) + + # There should be no more events on the list + assert not received_events class TestSyncScheduler: - @pytest.mark.parametrize('anyio_backend', ['asyncio']) - def test_schedule_job(self, caplog, anyio_backend, sync_store, portal): - def listener(event): - events.append(event) - if isinstance(event, JobSuccessful): - scheduler.stop() - - caplog.set_level(logging.DEBUG) - events = [] - with SyncScheduler(sync_store, portal=portal) as scheduler: - scheduler.worker.subscribe(listener) - scheduler.add_schedule(dummy_sync_job, DateTrigger(datetime.now(timezone.utc))) - scheduler.wait_until_stopped() - - assert len(events) == 2 - assert isinstance(events[1], JobSuccessful) - assert events[1].return_value == 'returnvalue' + def test_schedule_job(self): + def listener(received_event: Event) -> None: + received_events.append(received_event) + if len(received_events) == 4: + event.set() + + received_events: List[Event] = [] + event = threading.Event() + scheduler = Scheduler(start_worker=False) + scheduler.subscribe(listener) + trigger = DateTrigger(datetime.now(timezone.utc)) + with scheduler: + scheduler.add_schedule(dummy_sync_job, trigger, id='foo') + event.wait(3) + + # The scheduler was first started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + + # Then a schedule was added + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleAdded) + assert received_event.schedule_id == 'foo' + # assert received_event.task_id == 'task_id' + + # Then that schedule was processed and a job was added for it + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.schedule_id == 'foo' + assert received_event.task_id == 'test_schedulers:dummy_sync_job' + + # Then the schedule was removed since the trigger had been exhausted + received_event = received_events.pop(0) + assert isinstance(received_event, ScheduleRemoved) + assert received_event.schedule_id == 'foo' + + # Finally, the scheduler was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStopped) + + # There should be no more events on the list + assert not received_events diff --git a/tests/test_workers.py b/tests/test_workers.py index 18d24f6..9f098bc 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -1,12 +1,17 @@ import threading -from datetime import datetime +from datetime import datetime, timezone +from typing import Callable, List +import anyio import pytest -from anyio import Event, fail_after +from anyio import fail_after from apscheduler.abc import Job -from apscheduler.events import JobDeadlineMissed, JobFailed, JobSuccessful, JobUpdated +from apscheduler.datastores.sync.memory import MemoryDataStore +from apscheduler.events import ( + Event, JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, WorkerStarted, + WorkerStopped) from apscheduler.workers.async_ import AsyncWorker -from apscheduler.workers.sync import SyncWorker +from apscheduler.workers.sync import Worker pytestmark = pytest.mark.anyio @@ -32,120 +37,199 @@ def fail_func(): class TestAsyncWorker: @pytest.mark.parametrize('target_func', [sync_func, async_func], ids=['sync', 'async']) @pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail']) - @pytest.mark.parametrize('anyio_backend', ['asyncio']) - async def test_run_job_nonscheduled_success(self, target_func, fail, store): - async def listener(worker_event): - worker_events.append(worker_event) - if len(worker_events) == 2: + async def test_run_job_nonscheduled_success(self, target_func: Callable, fail: bool) -> None: + def listener(received_event: Event): + received_events.append(received_event) + if len(received_events) == 4: event.set() - worker_events = [] - event = Event() - job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) - async with AsyncWorker(store) as worker: - worker.subscribe(listener) - await store.add_job(job) - with fail_after(2): + received_events: List[Event] = [] + event = anyio.Event() + data_store = MemoryDataStore() + worker = AsyncWorker(data_store) + worker.subscribe(listener) + async with worker: + job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) + await worker.data_store.add_job(job) + with fail_after(3): await event.wait() - assert len(worker_events) == 2 - - assert isinstance(worker_events[0], JobUpdated) - assert worker_events[0].job_id == job.id - assert worker_events[0].task_id == 'task_id' - assert worker_events[0].schedule_id is None - - assert worker_events[1].job_id == job.id - assert worker_events[1].task_id == 'task_id' - assert worker_events[1].schedule_id is None + # The worker was first started + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStarted) + + # Then a job was added + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id is None + + # Then the job was started + received_event = received_events.pop(0) + assert isinstance(received_event, JobStarted) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id is None + + received_event = received_events.pop(0) if fail: - assert isinstance(worker_events[1], JobFailed) - assert type(worker_events[1].exception) is Exception - assert isinstance(worker_events[1].traceback, str) + # Then the job failed + assert isinstance(received_event, JobFailed) + assert isinstance(received_event.exception, str) + assert isinstance(received_event.traceback, str) else: - assert isinstance(worker_events[1], JobSuccessful) - assert worker_events[1].return_value == ((1, 2), {'x': 'foo'}) - - @pytest.mark.parametrize('anyio_backend', ['asyncio']) - async def test_run_deadline_missed(self, store): - async def listener(worker_event): - worker_events.append(worker_event) - event.set() - - scheduled_start_time = datetime(2020, 9, 14) - worker_events = [] - event = Event() - job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo', - scheduled_fire_time=scheduled_start_time, - start_deadline=datetime(2020, 9, 14, 1)) - async with AsyncWorker(store) as worker: - worker.subscribe(listener) - await store.add_job(job) - with fail_after(5): + # Then the job finished successfully + assert isinstance(received_event, JobCompleted) + assert received_event.return_value == ((1, 2), {'x': 'foo'}) + + # Finally, the worker was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStopped) + + # There should be no more events on the list + assert not received_events + + async def test_run_deadline_missed(self) -> None: + def listener(received_event: Event): + received_events.append(received_event) + if len(received_events) == 3: + event.set() + + scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) + received_events: List[Event] = [] + event = anyio.Event() + data_store = MemoryDataStore() + worker = AsyncWorker(data_store) + worker.subscribe(listener) + async with worker: + job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo', + scheduled_fire_time=scheduled_start_time, + start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc)) + await worker.data_store.add_job(job) + with fail_after(3): await event.wait() - assert len(worker_events) == 1 - assert isinstance(worker_events[0], JobDeadlineMissed) - assert worker_events[0].job_id == job.id - assert worker_events[0].task_id == 'task_id' - assert worker_events[0].schedule_id == 'foo' - assert worker_events[0].scheduled_fire_time == scheduled_start_time + # The worker was first started + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStarted) + + # Then a job was added + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id == 'foo' + + # Then the deadline was missed + received_event = received_events.pop(0) + assert isinstance(received_event, JobDeadlineMissed) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id == 'foo' + + # Finally, the worker was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStopped) + + # There should be no more events on the list + assert not received_events class TestSyncWorker: - @pytest.mark.parametrize('target_func', [sync_func, async_func], ids=['sync', 'async']) @pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail']) - def test_run_job_nonscheduled(self, anyio_backend, target_func, fail, sync_store, portal): - def listener(worker_event): - print('received event:', worker_event) - worker_events.append(worker_event) - if len(worker_events) == 2: + def test_run_job_nonscheduled(self, fail: bool) -> None: + def listener(received_event: Event): + received_events.append(received_event) + if len(received_events) == 4: event.set() - worker_events = [] + received_events: List[Event] = [] event = threading.Event() - job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) - with SyncWorker(sync_store, portal=portal) as worker: - worker.subscribe(listener) - portal.call(sync_store.add_job, job) - event.wait(2) - - assert len(worker_events) == 2 - - assert isinstance(worker_events[0], JobUpdated) - assert worker_events[0].job_id == job.id - assert worker_events[0].task_id == 'task_id' - assert worker_events[0].schedule_id is None - - assert worker_events[1].job_id == job.id - assert worker_events[1].task_id == 'task_id' - assert worker_events[1].schedule_id is None + data_store = MemoryDataStore() + worker = Worker(data_store) + worker.subscribe(listener) + with worker: + job = Job('task_id', func=sync_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail}) + worker.data_store.add_job(job) + event.wait(5) + + # The worker was first started + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStarted) + + # Then a job was added + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id is None + + # Then the job was started + received_event = received_events.pop(0) + assert isinstance(received_event, JobStarted) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id is None + + received_event = received_events.pop(0) if fail: - assert isinstance(worker_events[1], JobFailed) - assert type(worker_events[1].exception) is Exception - assert isinstance(worker_events[1].traceback, str) + # Then the job failed + assert isinstance(received_event, JobFailed) + assert isinstance(received_event.exception, str) + assert isinstance(received_event.traceback, str) else: - assert isinstance(worker_events[1], JobSuccessful) - assert worker_events[1].return_value == ((1, 2), {'x': 'foo'}) + # Then the job finished successfully + assert isinstance(received_event, JobCompleted) + assert received_event.return_value == ((1, 2), {'x': 'foo'}) + + # Finally, the worker was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStopped) - def test_run_deadline_missed(self, anyio_backend, sync_store, portal): - def listener(worker_event): - worker_events.append(worker_event) - event.set() + # There should be no more events on the list + assert not received_events + + def test_run_deadline_missed(self) -> None: + def listener(worker_event: Event): + received_events.append(worker_event) + if len(received_events) == 3: + event.set() - scheduled_start_time = datetime(2020, 9, 14) - worker_events = [] + scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc) + received_events: List[Event] = [] event = threading.Event() - job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo', - scheduled_fire_time=scheduled_start_time, - start_deadline=datetime(2020, 9, 14, 1)) - with SyncWorker(sync_store, portal=portal) as worker: - worker.subscribe(listener) - portal.call(sync_store.add_job, job) + data_store = MemoryDataStore() + worker = Worker(data_store) + worker.subscribe(listener) + with worker: + job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo', + scheduled_fire_time=scheduled_start_time, + start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc)) + worker.data_store.add_job(job) event.wait(5) - assert len(worker_events) == 1 - assert isinstance(worker_events[0], JobDeadlineMissed) - assert worker_events[0].job_id == job.id - assert worker_events[0].task_id == 'task_id' - assert worker_events[0].schedule_id == 'foo' + # The worker was first started + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStarted) + + # Then a job was added + received_event = received_events.pop(0) + assert isinstance(received_event, JobAdded) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id == 'foo' + + # Then the deadline was missed + received_event = received_events.pop(0) + assert isinstance(received_event, JobDeadlineMissed) + assert received_event.job_id == job.id + assert received_event.task_id == 'task_id' + assert received_event.schedule_id == 'foo' + + # Finally, the worker was stopped + received_event = received_events.pop(0) + assert isinstance(received_event, WorkerStopped) + + # There should be no more events on the list + assert not received_events |