diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-06-09 12:40:55 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-07-19 00:48:51 +0300 |
commit | f215c1ab45959095f6b499eb7b26356c5937ee8b (patch) | |
tree | 552687a0ed6e799b3da96eec5cd3fbb14d19f1b5 /src/apscheduler/datastores | |
parent | e3158fdf59a7c92a9449a566a2b746a4024e582f (diff) | |
download | apscheduler-f215c1ab45959095f6b499eb7b26356c5937ee8b.tar.gz |
Added support for starting the sync scheduler (and worker) without the context manager
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r-- | src/apscheduler/datastores/async_adapter.py | 42 | ||||
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 24 | ||||
-rw-r--r-- | src/apscheduler/datastores/base.py | 37 | ||||
-rw-r--r-- | src/apscheduler/datastores/memory.py | 20 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 29 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 24 |
6 files changed, 76 insertions, 100 deletions
diff --git a/src/apscheduler/datastores/async_adapter.py b/src/apscheduler/datastores/async_adapter.py index 3b2342e..99accd4 100644 --- a/src/apscheduler/datastores/async_adapter.py +++ b/src/apscheduler/datastores/async_adapter.py @@ -1,8 +1,6 @@ from __future__ import annotations -from contextlib import AsyncExitStack from datetime import datetime -from functools import partial from typing import Iterable from uuid import UUID @@ -10,43 +8,35 @@ import attrs from anyio import to_thread from anyio.from_thread import BlockingPortal -from ..abc import AsyncDataStore, AsyncEventBroker, DataStore, EventSource +from ..abc import AsyncEventBroker, DataStore from ..enums import ConflictPolicy -from ..eventbrokers.async_adapter import AsyncEventBrokerAdapter +from ..eventbrokers.async_adapter import AsyncEventBrokerAdapter, SyncEventBrokerAdapter from ..structures import Job, JobResult, Schedule, Task -from ..util import reentrant +from .base import BaseAsyncDataStore -@reentrant @attrs.define(eq=False) -class AsyncDataStoreAdapter(AsyncDataStore): +class AsyncDataStoreAdapter(BaseAsyncDataStore): original: DataStore _portal: BlockingPortal = attrs.field(init=False) - _events: AsyncEventBroker = attrs.field(init=False) - _exit_stack: AsyncExitStack = attrs.field(init=False) - @property - def events(self) -> EventSource: - return self._events - - async def __aenter__(self) -> AsyncDataStoreAdapter: - self._exit_stack = AsyncExitStack() + async def start(self, event_broker: AsyncEventBroker) -> None: + await super().start(event_broker) self._portal = BlockingPortal() - await self._exit_stack.enter_async_context(self._portal) - - self._events = AsyncEventBrokerAdapter(self.original.events, self._portal) - await self._exit_stack.enter_async_context(self._events) + await self._portal.__aenter__() - await to_thread.run_sync(self.original.__enter__) - self._exit_stack.push_async_exit( - partial(to_thread.run_sync, self.original.__exit__) - ) + if isinstance(event_broker, AsyncEventBrokerAdapter): + sync_event_broker = event_broker.original + else: + sync_event_broker = SyncEventBrokerAdapter(event_broker, self._portal) - return self + await to_thread.run_sync(lambda: self.original.start(sync_event_broker)) - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) + async def stop(self, *, force: bool = False) -> None: + await to_thread.run_sync(lambda: self.original.stop(force=force)) + await self._portal.__aexit__(None, None, None) + await super().stop(force=force) async def add_task(self, task: Task) -> None: await to_thread.run_sync(self.original.add_task, task) diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index aa56c6f..edf9b3e 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -17,9 +17,8 @@ from sqlalchemy.ext.asyncio.engine import AsyncEngine from sqlalchemy.sql.ddl import DropTable from sqlalchemy.sql.elements import BindParameter -from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule +from ..abc import AsyncEventBroker, Job, Schedule from ..enums import ConflictPolicy -from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( DataStoreEvent, JobAcquired, @@ -36,17 +35,14 @@ from ..events import ( from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..structures import JobResult, Task -from ..util import reentrant +from .base import BaseAsyncDataStore from .sqlalchemy import _BaseSQLAlchemyDataStore -@reentrant @attrs.define(eq=False) -class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): +class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore): engine: AsyncEngine - _events: AsyncEventBroker = attrs.field(factory=LocalAsyncEventBroker) - @classmethod def from_url(cls, url: str | URL, **options) -> AsyncSQLAlchemyDataStore: engine = create_async_engine(url, future=True) @@ -63,7 +59,9 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): reraise=True, ) - async def __aenter__(self): + async def start(self, event_broker: AsyncEventBroker) -> None: + await super().start(event_broker) + asynclib = sniffio.current_async_library() or "(unknown)" if asynclib != "asyncio": raise RuntimeError( @@ -92,16 +90,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): f"only version 1 is supported by this version of APScheduler" ) - await self._events.__aenter__() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._events.__aexit__(exc_type, exc_val, exc_tb) - - @property - def events(self) -> EventSource: - return self._events - async def _deserialize_schedules(self, result: Result) -> list[Schedule]: schedules: list[Schedule] = [] for row in result: diff --git a/src/apscheduler/datastores/base.py b/src/apscheduler/datastores/base.py new file mode 100644 index 0000000..c05d28c --- /dev/null +++ b/src/apscheduler/datastores/base.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from apscheduler.abc import ( + AsyncDataStore, + AsyncEventBroker, + DataStore, + EventBroker, + EventSource, +) + + +class BaseDataStore(DataStore): + _events: EventBroker + + def start(self, event_broker: EventBroker) -> None: + self._events = event_broker + + def stop(self, *, force: bool = False) -> None: + del self._events + + @property + def events(self) -> EventSource: + return self._events + + +class BaseAsyncDataStore(AsyncDataStore): + _events: AsyncEventBroker + + async def start(self, event_broker: AsyncEventBroker) -> None: + self._events = event_broker + + async def stop(self, *, force: bool = False) -> None: + del self._events + + @property + def events(self) -> EventSource: + return self._events diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index c2d7420..516d141 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -9,9 +9,8 @@ from uuid import UUID import attrs -from ..abc import DataStore, EventBroker, EventSource, Job, Schedule +from ..abc import Job, Schedule from ..enums import ConflictPolicy -from ..eventbrokers.local import LocalEventBroker from ..events import ( JobAcquired, JobAdded, @@ -25,7 +24,7 @@ from ..events import ( ) from ..exceptions import ConflictingIdError, TaskLookupError from ..structures import JobResult, Task -from ..util import reentrant +from .base import BaseDataStore max_datetime = datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) @@ -83,11 +82,9 @@ class JobState: return hash(self.job.id) -@reentrant @attrs.define(eq=False) -class MemoryDataStore(DataStore): +class MemoryDataStore(BaseDataStore): lock_expiration_delay: float = 30 - _events: EventBroker = attrs.Factory(LocalEventBroker) _tasks: dict[str, TaskState] = attrs.Factory(dict) _schedules: list[ScheduleState] = attrs.Factory(list) _schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict) @@ -111,17 +108,6 @@ class MemoryDataStore(DataStore): right_index = bisect_left(self._jobs, state) return self._jobs.index(state, left_index, right_index + 1) - def __enter__(self): - self._events.__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._events.__exit__(exc_type, exc_val, exc_tb) - - @property - def events(self) -> EventSource: - return self._events - def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: return [ state.schedule diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 2669d03..59a345a 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -2,7 +2,6 @@ from __future__ import annotations import operator from collections import defaultdict -from contextlib import ExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger from typing import Any, Callable, ClassVar, Iterable @@ -18,7 +17,7 @@ from pymongo import ASCENDING, DeleteOne, MongoClient, UpdateOne from pymongo.collection import Collection from pymongo.errors import ConnectionFailure, DuplicateKeyError -from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer +from ..abc import EventBroker, Job, Schedule, Serializer from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome from ..eventbrokers.local import LocalEventBroker from ..events import ( @@ -41,7 +40,7 @@ from ..exceptions import ( ) from ..serializers.pickle import PickleSerializer from ..structures import JobResult, RetrySettings, Task -from ..util import reentrant +from .base import BaseDataStore class CustomEncoder(TypeEncoder): @@ -62,14 +61,14 @@ def ensure_uuid_presentation(client: MongoClient) -> None: pass -@reentrant @attrs.define(eq=False) -class MongoDBDataStore(DataStore): +class MongoDBDataStore(BaseDataStore): client: MongoClient = attrs.field(validator=instance_of(MongoClient)) serializer: Serializer = attrs.field(factory=PickleSerializer, kw_only=True) + event_broker = attrs.field(factory=LocalEventBroker, kw_only=True) database: str = attrs.field(default="apscheduler", kw_only=True) lock_expiration_delay: float = attrs.field(default=30, kw_only=True) - retry_settings: RetrySettings = attrs.field(default=RetrySettings()) + retry_settings: RetrySettings = attrs.field(default=RetrySettings(), kw_only=True) start_from_scratch: bool = attrs.field(default=False, kw_only=True) _task_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Task)] @@ -79,8 +78,6 @@ class MongoDBDataStore(DataStore): _job_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Job)] _logger: Logger = attrs.field(init=False, factory=lambda: getLogger(__name__)) - _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack) - _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker) _local_tasks: dict[str, Task] = attrs.field(init=False, factory=dict) def __attrs_post_init__(self) -> None: @@ -108,10 +105,6 @@ class MongoDBDataStore(DataStore): client = MongoClient(uri) return cls(client, **options) - @property - def events(self) -> EventSource: - return self._events - def _retry(self) -> tenacity.Retrying: return tenacity.Retrying( stop=self.retry_settings.stop, @@ -128,7 +121,9 @@ class MongoDBDataStore(DataStore): retry_state.outcome.exception(), ) - def __enter__(self): + def start(self, event_broker: EventBroker) -> None: + super().start(event_broker) + server_info = self.client.server_info() if server_info["versionArray"] < [4, 0]: raise RuntimeError( @@ -136,9 +131,6 @@ class MongoDBDataStore(DataStore): f"{server_info['version']}" ) - self._exit_stack.__enter__() - self._exit_stack.enter_context(self._events) - for attempt in self._retry(): with attempt, self.client.start_session() as session: if self.start_from_scratch: @@ -153,11 +145,6 @@ class MongoDBDataStore(DataStore): self._jobs.create_index("tags", session=session) self._jobs_results.create_index("finished_at", session=session) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._exit_stack.__exit__(exc_type, exc_val, exc_tb) - def add_task(self, task: Task) -> None: for attempt in self._retry(): with attempt: diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index cca223e..9ee03fa 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -31,9 +31,8 @@ from sqlalchemy.future import Engine, create_engine from sqlalchemy.sql.ddl import DropTable from sqlalchemy.sql.elements import BindParameter, literal -from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer +from ..abc import EventBroker, Job, Schedule, Serializer from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome -from ..eventbrokers.local import LocalEventBroker from ..events import ( Event, JobAcquired, @@ -52,7 +51,7 @@ from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..serializers.pickle import PickleSerializer from ..structures import JobResult, RetrySettings, Task -from ..util import reentrant +from .base import BaseDataStore class EmulatedUUID(TypeDecorator): @@ -219,13 +218,10 @@ class _BaseSQLAlchemyDataStore: return jobs -@reentrant @attrs.define(eq=False) -class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): +class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): engine: Engine - _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker) - @classmethod def from_url(cls, url: str | URL, **options) -> SQLAlchemyDataStore: engine = create_engine(url) @@ -240,7 +236,9 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): reraise=True, ) - def __enter__(self): + def start(self, event_broker: EventBroker) -> None: + super().start(event_broker) + for attempt in self._retry(): with attempt, self.engine.begin() as conn: if self.start_from_scratch: @@ -259,16 +257,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): f"only version 1 is supported by this version of APScheduler" ) - self._events.__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._events.__exit__(exc_type, exc_val, exc_tb) - - @property - def events(self) -> EventSource: - return self._events - def add_task(self, task: Task) -> None: insert = self.t_tasks.insert().values( id=task.id, |