diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
commit | 48722053dfb43de077df18a139abb16b0a7f7e24 (patch) | |
tree | bd55e709e0d4c02619ef0ec54390a8f792da2f74 /src/apscheduler/datastores | |
parent | a58fca290e0831d377d496a69101e5e3dc4c604e (diff) | |
download | apscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz |
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 15 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 13 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 11 |
3 files changed, 9 insertions, 30 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index a215d68..b15b154 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Any, Callable, Iterable, Optional +from typing import Any, Iterable, Optional from uuid import UUID import attr @@ -19,9 +19,9 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded, - ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, - TaskRemoved, TaskUpdated) + DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded, + ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, + TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..structures import JobResult, Task @@ -93,13 +93,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): return jobs - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self.events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self.events.unsubscribe(token) - async def add_task(self, task: Task) -> None: insert = self.t_tasks.insert().\ values(id=task.id, func=callable_to_ref(task.func), diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 6e7d0aa..ad4d568 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -4,7 +4,7 @@ from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timezone from logging import Logger, getLogger -from typing import Any, Callable, ClassVar, Iterable, Optional +from typing import ClassVar, Iterable, Optional from uuid import UUID import attr @@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, - TaskAdded, TaskRemoved, TaskUpdated) + DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, + TaskRemoved, TaskUpdated) from ..exceptions import ( ConflictingIdError, DeserializationError, SerializationError, TaskLookupError) from ..serializers.pickle import PickleSerializer @@ -91,13 +91,6 @@ class MongoDBDataStore(DataStore): def __exit__(self, exc_type, exc_val, exc_tb): self._exit_stack.__exit__(exc_type, exc_val, exc_tb) - def subscribe(self, callback: Callable[[events.Event], Any], - event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: events.SubscriptionToken) -> None: - self._events.unsubscribe(token) - def add_task(self, task: Task) -> None: previous = self._tasks.find_one_and_update( {'_id': task.id}, diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 3040ae4..8dea821 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional +from typing import Any, Iterable, Optional from uuid import UUID import attr @@ -21,7 +21,7 @@ from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome from ..eventbrokers.local import LocalEventBroker from ..events import ( Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, - ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated) + ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..serializers.pickle import PickleSerializer @@ -212,13 +212,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): def events(self) -> EventSource: return self._events - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - def add_task(self, task: Task) -> None: insert = self.t_tasks.insert().\ values(id=task.id, func=callable_to_ref(task.func), |