summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-06-09 12:40:55 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-07-19 00:48:51 +0300
commitf215c1ab45959095f6b499eb7b26356c5937ee8b (patch)
tree552687a0ed6e799b3da96eec5cd3fbb14d19f1b5 /src/apscheduler/datastores
parente3158fdf59a7c92a9449a566a2b746a4024e582f (diff)
downloadapscheduler-f215c1ab45959095f6b499eb7b26356c5937ee8b.tar.gz
Added support for starting the sync scheduler (and worker) without the context manager
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r--src/apscheduler/datastores/async_adapter.py42
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py24
-rw-r--r--src/apscheduler/datastores/base.py37
-rw-r--r--src/apscheduler/datastores/memory.py20
-rw-r--r--src/apscheduler/datastores/mongodb.py29
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py24
6 files changed, 76 insertions, 100 deletions
diff --git a/src/apscheduler/datastores/async_adapter.py b/src/apscheduler/datastores/async_adapter.py
index 3b2342e..99accd4 100644
--- a/src/apscheduler/datastores/async_adapter.py
+++ b/src/apscheduler/datastores/async_adapter.py
@@ -1,8 +1,6 @@
from __future__ import annotations
-from contextlib import AsyncExitStack
from datetime import datetime
-from functools import partial
from typing import Iterable
from uuid import UUID
@@ -10,43 +8,35 @@ import attrs
from anyio import to_thread
from anyio.from_thread import BlockingPortal
-from ..abc import AsyncDataStore, AsyncEventBroker, DataStore, EventSource
+from ..abc import AsyncEventBroker, DataStore
from ..enums import ConflictPolicy
-from ..eventbrokers.async_adapter import AsyncEventBrokerAdapter
+from ..eventbrokers.async_adapter import AsyncEventBrokerAdapter, SyncEventBrokerAdapter
from ..structures import Job, JobResult, Schedule, Task
-from ..util import reentrant
+from .base import BaseAsyncDataStore
-@reentrant
@attrs.define(eq=False)
-class AsyncDataStoreAdapter(AsyncDataStore):
+class AsyncDataStoreAdapter(BaseAsyncDataStore):
original: DataStore
_portal: BlockingPortal = attrs.field(init=False)
- _events: AsyncEventBroker = attrs.field(init=False)
- _exit_stack: AsyncExitStack = attrs.field(init=False)
- @property
- def events(self) -> EventSource:
- return self._events
-
- async def __aenter__(self) -> AsyncDataStoreAdapter:
- self._exit_stack = AsyncExitStack()
+ async def start(self, event_broker: AsyncEventBroker) -> None:
+ await super().start(event_broker)
self._portal = BlockingPortal()
- await self._exit_stack.enter_async_context(self._portal)
-
- self._events = AsyncEventBrokerAdapter(self.original.events, self._portal)
- await self._exit_stack.enter_async_context(self._events)
+ await self._portal.__aenter__()
- await to_thread.run_sync(self.original.__enter__)
- self._exit_stack.push_async_exit(
- partial(to_thread.run_sync, self.original.__exit__)
- )
+ if isinstance(event_broker, AsyncEventBrokerAdapter):
+ sync_event_broker = event_broker.original
+ else:
+ sync_event_broker = SyncEventBrokerAdapter(event_broker, self._portal)
- return self
+ await to_thread.run_sync(lambda: self.original.start(sync_event_broker))
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
+ async def stop(self, *, force: bool = False) -> None:
+ await to_thread.run_sync(lambda: self.original.stop(force=force))
+ await self._portal.__aexit__(None, None, None)
+ await super().stop(force=force)
async def add_task(self, task: Task) -> None:
await to_thread.run_sync(self.original.add_task, task)
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index aa56c6f..edf9b3e 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -17,9 +17,8 @@ from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlalchemy.sql.ddl import DropTable
from sqlalchemy.sql.elements import BindParameter
-from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
+from ..abc import AsyncEventBroker, Job, Schedule
from ..enums import ConflictPolicy
-from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
DataStoreEvent,
JobAcquired,
@@ -36,17 +35,14 @@ from ..events import (
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
-from ..util import reentrant
+from .base import BaseAsyncDataStore
from .sqlalchemy import _BaseSQLAlchemyDataStore
-@reentrant
@attrs.define(eq=False)
-class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
+class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore):
engine: AsyncEngine
- _events: AsyncEventBroker = attrs.field(factory=LocalAsyncEventBroker)
-
@classmethod
def from_url(cls, url: str | URL, **options) -> AsyncSQLAlchemyDataStore:
engine = create_async_engine(url, future=True)
@@ -63,7 +59,9 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
reraise=True,
)
- async def __aenter__(self):
+ async def start(self, event_broker: AsyncEventBroker) -> None:
+ await super().start(event_broker)
+
asynclib = sniffio.current_async_library() or "(unknown)"
if asynclib != "asyncio":
raise RuntimeError(
@@ -92,16 +90,6 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
f"only version 1 is supported by this version of APScheduler"
)
- await self._events.__aenter__()
- return self
-
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self._events.__aexit__(exc_type, exc_val, exc_tb)
-
- @property
- def events(self) -> EventSource:
- return self._events
-
async def _deserialize_schedules(self, result: Result) -> list[Schedule]:
schedules: list[Schedule] = []
for row in result:
diff --git a/src/apscheduler/datastores/base.py b/src/apscheduler/datastores/base.py
new file mode 100644
index 0000000..c05d28c
--- /dev/null
+++ b/src/apscheduler/datastores/base.py
@@ -0,0 +1,37 @@
+from __future__ import annotations
+
+from apscheduler.abc import (
+ AsyncDataStore,
+ AsyncEventBroker,
+ DataStore,
+ EventBroker,
+ EventSource,
+)
+
+
+class BaseDataStore(DataStore):
+ _events: EventBroker
+
+ def start(self, event_broker: EventBroker) -> None:
+ self._events = event_broker
+
+ def stop(self, *, force: bool = False) -> None:
+ del self._events
+
+ @property
+ def events(self) -> EventSource:
+ return self._events
+
+
+class BaseAsyncDataStore(AsyncDataStore):
+ _events: AsyncEventBroker
+
+ async def start(self, event_broker: AsyncEventBroker) -> None:
+ self._events = event_broker
+
+ async def stop(self, *, force: bool = False) -> None:
+ del self._events
+
+ @property
+ def events(self) -> EventSource:
+ return self._events
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index c2d7420..516d141 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -9,9 +9,8 @@ from uuid import UUID
import attrs
-from ..abc import DataStore, EventBroker, EventSource, Job, Schedule
+from ..abc import Job, Schedule
from ..enums import ConflictPolicy
-from ..eventbrokers.local import LocalEventBroker
from ..events import (
JobAcquired,
JobAdded,
@@ -25,7 +24,7 @@ from ..events import (
)
from ..exceptions import ConflictingIdError, TaskLookupError
from ..structures import JobResult, Task
-from ..util import reentrant
+from .base import BaseDataStore
max_datetime = datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc)
@@ -83,11 +82,9 @@ class JobState:
return hash(self.job.id)
-@reentrant
@attrs.define(eq=False)
-class MemoryDataStore(DataStore):
+class MemoryDataStore(BaseDataStore):
lock_expiration_delay: float = 30
- _events: EventBroker = attrs.Factory(LocalEventBroker)
_tasks: dict[str, TaskState] = attrs.Factory(dict)
_schedules: list[ScheduleState] = attrs.Factory(list)
_schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict)
@@ -111,17 +108,6 @@ class MemoryDataStore(DataStore):
right_index = bisect_left(self._jobs, state)
return self._jobs.index(state, left_index, right_index + 1)
- def __enter__(self):
- self._events.__enter__()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._events.__exit__(exc_type, exc_val, exc_tb)
-
- @property
- def events(self) -> EventSource:
- return self._events
-
def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
return [
state.schedule
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 2669d03..59a345a 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -2,7 +2,6 @@ from __future__ import annotations
import operator
from collections import defaultdict
-from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from typing import Any, Callable, ClassVar, Iterable
@@ -18,7 +17,7 @@ from pymongo import ASCENDING, DeleteOne, MongoClient, UpdateOne
from pymongo.collection import Collection
from pymongo.errors import ConnectionFailure, DuplicateKeyError
-from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
+from ..abc import EventBroker, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
@@ -41,7 +40,7 @@ from ..exceptions import (
)
from ..serializers.pickle import PickleSerializer
from ..structures import JobResult, RetrySettings, Task
-from ..util import reentrant
+from .base import BaseDataStore
class CustomEncoder(TypeEncoder):
@@ -62,14 +61,14 @@ def ensure_uuid_presentation(client: MongoClient) -> None:
pass
-@reentrant
@attrs.define(eq=False)
-class MongoDBDataStore(DataStore):
+class MongoDBDataStore(BaseDataStore):
client: MongoClient = attrs.field(validator=instance_of(MongoClient))
serializer: Serializer = attrs.field(factory=PickleSerializer, kw_only=True)
+ event_broker = attrs.field(factory=LocalEventBroker, kw_only=True)
database: str = attrs.field(default="apscheduler", kw_only=True)
lock_expiration_delay: float = attrs.field(default=30, kw_only=True)
- retry_settings: RetrySettings = attrs.field(default=RetrySettings())
+ retry_settings: RetrySettings = attrs.field(default=RetrySettings(), kw_only=True)
start_from_scratch: bool = attrs.field(default=False, kw_only=True)
_task_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Task)]
@@ -79,8 +78,6 @@ class MongoDBDataStore(DataStore):
_job_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Job)]
_logger: Logger = attrs.field(init=False, factory=lambda: getLogger(__name__))
- _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack)
- _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker)
_local_tasks: dict[str, Task] = attrs.field(init=False, factory=dict)
def __attrs_post_init__(self) -> None:
@@ -108,10 +105,6 @@ class MongoDBDataStore(DataStore):
client = MongoClient(uri)
return cls(client, **options)
- @property
- def events(self) -> EventSource:
- return self._events
-
def _retry(self) -> tenacity.Retrying:
return tenacity.Retrying(
stop=self.retry_settings.stop,
@@ -128,7 +121,9 @@ class MongoDBDataStore(DataStore):
retry_state.outcome.exception(),
)
- def __enter__(self):
+ def start(self, event_broker: EventBroker) -> None:
+ super().start(event_broker)
+
server_info = self.client.server_info()
if server_info["versionArray"] < [4, 0]:
raise RuntimeError(
@@ -136,9 +131,6 @@ class MongoDBDataStore(DataStore):
f"{server_info['version']}"
)
- self._exit_stack.__enter__()
- self._exit_stack.enter_context(self._events)
-
for attempt in self._retry():
with attempt, self.client.start_session() as session:
if self.start_from_scratch:
@@ -153,11 +145,6 @@ class MongoDBDataStore(DataStore):
self._jobs.create_index("tags", session=session)
self._jobs_results.create_index("finished_at", session=session)
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
-
def add_task(self, task: Task) -> None:
for attempt in self._retry():
with attempt:
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index cca223e..9ee03fa 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -31,9 +31,8 @@ from sqlalchemy.future import Engine, create_engine
from sqlalchemy.sql.ddl import DropTable
from sqlalchemy.sql.elements import BindParameter, literal
-from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
+from ..abc import EventBroker, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
-from ..eventbrokers.local import LocalEventBroker
from ..events import (
Event,
JobAcquired,
@@ -52,7 +51,7 @@ from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
from ..structures import JobResult, RetrySettings, Task
-from ..util import reentrant
+from .base import BaseDataStore
class EmulatedUUID(TypeDecorator):
@@ -219,13 +218,10 @@ class _BaseSQLAlchemyDataStore:
return jobs
-@reentrant
@attrs.define(eq=False)
-class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
+class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
engine: Engine
- _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker)
-
@classmethod
def from_url(cls, url: str | URL, **options) -> SQLAlchemyDataStore:
engine = create_engine(url)
@@ -240,7 +236,9 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
reraise=True,
)
- def __enter__(self):
+ def start(self, event_broker: EventBroker) -> None:
+ super().start(event_broker)
+
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
if self.start_from_scratch:
@@ -259,16 +257,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
f"only version 1 is supported by this version of APScheduler"
)
- self._events.__enter__()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._events.__exit__(exc_type, exc_val, exc_tb)
-
- @property
- def events(self) -> EventSource:
- return self._events
-
def add_task(self, task: Task) -> None:
insert = self.t_tasks.insert().values(
id=task.id,