summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
commit48722053dfb43de077df18a139abb16b0a7f7e24 (patch)
treebd55e709e0d4c02619ef0ec54390a8f792da2f74 /src/apscheduler/datastores
parenta58fca290e0831d377d496a69101e5e3dc4c604e (diff)
downloadapscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py15
-rw-r--r--src/apscheduler/datastores/mongodb.py13
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py11
3 files changed, 9 insertions, 30 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index a215d68..b15b154 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -2,7 +2,7 @@ from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta, timezone
-from typing import Any, Callable, Iterable, Optional
+from typing import Any, Iterable, Optional
from uuid import UUID
import attr
@@ -19,9 +19,9 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded,
- ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded,
- TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded,
+ ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
+ TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -93,13 +93,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
return jobs
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self.events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self.events.unsubscribe(token)
-
async def add_task(self, task: Task) -> None:
insert = self.t_tasks.insert().\
values(id=task.id, func=callable_to_ref(task.func),
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 6e7d0aa..ad4d568 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -4,7 +4,7 @@ from collections import defaultdict
from contextlib import ExitStack
from datetime import datetime, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, ClassVar, Iterable, Optional
+from typing import ClassVar, Iterable, Optional
from uuid import UUID
import attr
@@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken,
- TaskAdded, TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
+ TaskRemoved, TaskUpdated)
from ..exceptions import (
ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
from ..serializers.pickle import PickleSerializer
@@ -91,13 +91,6 @@ class MongoDBDataStore(DataStore):
def __exit__(self, exc_type, exc_val, exc_tb):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
- def subscribe(self, callback: Callable[[events.Event], Any],
- event_types: Optional[Iterable[type[events.Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: events.SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def add_task(self, task: Task) -> None:
previous = self._tasks.find_one_and_update(
{'_id': task.id},
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 3040ae4..8dea821 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -3,7 +3,7 @@ from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Optional
+from typing import Any, Iterable, Optional
from uuid import UUID
import attr
@@ -21,7 +21,7 @@ from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed,
- ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated)
+ ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
@@ -212,13 +212,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
def events(self) -> EventSource:
return self._events
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
def add_task(self, task: Task) -> None:
insert = self.t_tasks.insert().\
values(id=task.id, func=callable_to_ref(task.func),