summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-12 22:09:05 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-21 02:40:02 +0300
commitc5727432736b55b7d76753307f14efdb962c2edf (patch)
tree005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/schedulers
parent26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff)
downloadapscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler - Removed workers as a user interface - Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface - Added the current_async_scheduler contextvar - Added job executors
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py165
-rw-r--r--src/apscheduler/schedulers/sync.py644
2 files changed, 274 insertions, 535 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 75972b5..44fee27 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -5,6 +5,7 @@ import platform
import random
import sys
from asyncio import CancelledError
+from collections.abc import MutableMapping
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
@@ -16,9 +17,9 @@ import anyio
import attrs
from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after
from anyio.abc import TaskGroup, TaskStatus
+from attr.validators import instance_of
-from .._context import current_scheduler
-from .._converters import as_async_datastore, as_async_eventbroker
+from .._context import current_async_scheduler
from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from .._events import (
Event,
@@ -35,11 +36,14 @@ from .._exceptions import (
ScheduleLookupError,
)
from .._structures import Job, JobResult, Schedule, Task
-from ..abc import AsyncDataStore, AsyncEventBroker, Subscription, Trigger
+from .._worker import Worker
+from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
from ..datastores.memory import MemoryDataStore
-from ..eventbrokers.async_local import LocalAsyncEventBroker
+from ..eventbrokers.local import LocalEventBroker
+from ..executors.async_ import AsyncJobExecutor
+from ..executors.subprocess import ProcessPoolJobExecutor
+from ..executors.thread import ThreadPoolJobExecutor
from ..marshalling import callable_to_ref
-from ..workers.async_ import AsyncWorker
if sys.version_info >= (3, 11):
from typing import Self
@@ -54,19 +58,24 @@ _zero_timedelta = timedelta()
class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
- data_store: AsyncDataStore = attrs.field(
- converter=as_async_datastore, factory=MemoryDataStore
+ data_store: DataStore = attrs.field(
+ validator=instance_of(DataStore), factory=MemoryDataStore
)
- event_broker: AsyncEventBroker = attrs.field(
- converter=as_async_eventbroker, factory=LocalAsyncEventBroker
+ event_broker: EventBroker = attrs.field(
+ validator=instance_of(EventBroker), factory=LocalEventBroker
)
identity: str = attrs.field(kw_only=True, default=None)
- start_worker: bool = attrs.field(kw_only=True, default=True)
+ process_jobs: bool = attrs.field(kw_only=True, default=True)
+ job_executors: MutableMapping[str, JobExecutor] | None = attrs.field(
+ kw_only=True, default=None
+ )
+ default_job_executor: str | None = attrs.field(kw_only=True, default=None)
+ process_schedules: bool = attrs.field(kw_only=True, default=True)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_task_group: TaskGroup | None = attrs.field(init=False, default=None)
- _exit_stack: AsyncExitStack | None = attrs.field(init=False, default=None)
+ _exit_stack: AsyncExitStack = attrs.field(init=False, factory=AsyncExitStack)
_services_initialized: bool = attrs.field(init=False, default=False)
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
@@ -76,13 +85,32 @@ class AsyncScheduler:
if not self.identity:
self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
+ if not self.job_executors:
+ self.job_executors = {
+ "async": AsyncJobExecutor(),
+ "threadpool": ThreadPoolJobExecutor(),
+ "processpool": ProcessPoolJobExecutor(),
+ }
+
+ if not self.default_job_executor:
+ self.default_job_executor = next(iter(self.job_executors))
+ elif self.default_job_executor not in self.job_executors:
+ raise ValueError(
+ "default_job_executor must be one of the given job executors"
+ )
+
async def __aenter__(self: Self) -> Self:
- self._exit_stack = AsyncExitStack()
await self._exit_stack.__aenter__()
- await self._ensure_services_ready(self._exit_stack)
- self._task_group = await self._exit_stack.enter_async_context(
- create_task_group()
- )
+ try:
+ await self._ensure_services_initialized(self._exit_stack)
+ self._task_group = await self._exit_stack.enter_async_context(
+ create_task_group()
+ )
+ self._exit_stack.callback(setattr, self, "_task_group", None)
+ except BaseException as exc:
+ await self._exit_stack.__aexit__(type(exc), exc, exc.__traceback__)
+ raise
+
return self
async def __aexit__(
@@ -93,27 +121,25 @@ class AsyncScheduler:
) -> None:
await self.stop()
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
- self._task_group = None
- async def _ensure_services_ready(self, exit_stack: AsyncExitStack) -> None:
+ async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None:
"""Ensure that the data store and event broker have been initialized."""
if not self._services_initialized:
self._services_initialized = True
exit_stack.callback(setattr, self, "_services_initialized", False)
- # Initialize the event broker
- await self.event_broker.start()
- exit_stack.push_async_exit(
- lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None)
- )
+ await self.event_broker.start(exit_stack)
+ await self.data_store.start(exit_stack, self.event_broker)
- # Initialize the data store
- await self.data_store.start(self.event_broker)
- exit_stack.push_async_exit(
- lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None)
+ def _check_initialized(self) -> None:
+ if not self._services_initialized:
+ raise RuntimeError(
+ "The scheduler has not been initialized yet. Use the scheduler as an "
+ "async context manager (async with ...) in order to call methods other "
+ "than run_until_complete()."
)
- def _schedule_added_or_modified(self, event: Event) -> None:
+ async def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
if not self._wakeup_deadline or (
event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
@@ -128,6 +154,35 @@ class AsyncScheduler:
"""The current running state of the scheduler."""
return self._state
+ def subscribe(
+ self,
+ callback: Callable[[Event], Any],
+ event_types: Iterable[type[Event]] | None = None,
+ *,
+ one_shot: bool = False,
+ is_async: bool = True,
+ ) -> Subscription:
+ """
+ Subscribe to events.
+
+ To unsubscribe, call the :meth:`Subscription.unsubscribe` method on the returned
+ object.
+
+ :param callback: callable to be called with the event object when an event is
+ published
+ :param event_types: an iterable of concrete Event classes to subscribe to
+ :param one_shot: if ``True``, automatically unsubscribe after the first matching
+ event
+ :param is_async: ``True`` if the (synchronous) callback should be called on the
+ event loop thread, ``False`` if it should be called in a worker thread.
+ If the callback is a coroutine function, this flag is ignored.
+
+ """
+ self._check_initialized()
+ return self.event_broker.subscribe(
+ callback, event_types, is_async=is_async, one_shot=one_shot
+ )
+
async def add_schedule(
self,
func_or_task_id: str | Callable,
@@ -136,6 +191,7 @@ class AsyncScheduler:
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
max_jitter: float | timedelta | None = None,
@@ -152,6 +208,7 @@ class AsyncScheduler:
based ID will be assigned)
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
+ :param job_executor: name of the job executor to run the task with
:param coalesce: determines what to do when processing the schedule if multiple
fire times have become due for this schedule since the last processing
:param misfire_grace_time: maximum number of seconds the scheduled job's actual
@@ -165,6 +222,7 @@ class AsyncScheduler:
:return: the ID of the newly added schedule
"""
+ self._check_initialized()
id = id or str(uuid4())
args = tuple(args or ())
kwargs = dict(kwargs or {})
@@ -173,7 +231,11 @@ class AsyncScheduler:
misfire_grace_time = timedelta(seconds=misfire_grace_time)
if callable(func_or_task_id):
- task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
+ task = Task(
+ id=callable_to_ref(func_or_task_id),
+ func=func_or_task_id,
+ executor=job_executor or self.default_job_executor,
+ )
await self.data_store.add_task(task)
else:
task = await self.data_store.get_task(func_or_task_id)
@@ -207,6 +269,7 @@ class AsyncScheduler:
:raises ScheduleLookupError: if the schedule could not be found
"""
+ self._check_initialized()
schedules = await self.data_store.get_schedules({id})
if schedules:
return schedules[0]
@@ -220,6 +283,7 @@ class AsyncScheduler:
:return: a list of schedules, in an unspecified order
"""
+ self._check_initialized()
return await self.data_store.get_schedules()
async def remove_schedule(self, id: str) -> None:
@@ -229,6 +293,7 @@ class AsyncScheduler:
:param id: the unique identifier of the schedule
"""
+ self._check_initialized()
await self.data_store.remove_schedules({id})
async def add_job(
@@ -237,6 +302,7 @@ class AsyncScheduler:
*,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
tags: Iterable[str] | None = None,
result_expiration_time: timedelta | float = 0,
) -> UUID:
@@ -244,8 +310,10 @@ class AsyncScheduler:
Add a job to the data store.
:param func_or_task_id:
+ :param job_executor: name of the job executor to run the task with
:param args: positional arguments to call the target callable with
:param kwargs: keyword arguments to call the target callable with
+ :param job_executor: name of the job executor to run the task with
:param tags: strings that can be used to categorize and filter the job
:param result_expiration_time: the minimum time (as seconds, or timedelta) to
keep the result of the job available for fetching (the result won't be
@@ -253,8 +321,13 @@ class AsyncScheduler:
:return: the ID of the newly created job
"""
+ self._check_initialized()
if callable(func_or_task_id):
- task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
+ task = Task(
+ id=callable_to_ref(func_or_task_id),
+ func=func_or_task_id,
+ executor=job_executor or self.default_job_executor,
+ )
await self.data_store.add_task(task)
else:
task = await self.data_store.get_task(func_or_task_id)
@@ -280,13 +353,14 @@ class AsyncScheduler:
the data store
"""
+ self._check_initialized()
wait_event = anyio.Event()
def listener(event: JobReleased) -> None:
if event.job_id == job_id:
wait_event.set()
- with self.data_store.events.subscribe(listener, {JobReleased}):
+ with self.event_broker.subscribe(listener, {JobReleased}):
result = await self.data_store.get_job_result(job_id)
if result:
return result
@@ -303,6 +377,7 @@ class AsyncScheduler:
*,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
tags: Iterable[str] | None = (),
) -> Any:
"""
@@ -314,10 +389,12 @@ class AsyncScheduler:
definition
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
+ :param job_executor: name of the job executor to run the task with
:param tags: strings that can be used to categorize and filter the job
:returns: the return value of the task function
"""
+ self._check_initialized()
job_complete_event = anyio.Event()
def listener(event: JobReleased) -> None:
@@ -325,11 +402,12 @@ class AsyncScheduler:
job_complete_event.set()
job_id: UUID | None = None
- with self.data_store.events.subscribe(listener, {JobReleased}):
+ with self.event_broker.subscribe(listener, {JobReleased}):
job_id = await self.add_job(
func_or_task_id,
args=args,
kwargs=kwargs,
+ job_executor=job_executor,
tags=tags,
result_expiration_time=timedelta(minutes=15),
)
@@ -378,12 +456,7 @@ class AsyncScheduler:
await event.wait()
async def start_in_background(self) -> None:
- if self._task_group is None:
- raise RuntimeError(
- "The scheduler must be used as an async context manager (async with "
- "...) in order to be startable in the background"
- )
-
+ self._check_initialized()
await self._task_group.start(self.run_until_stopped)
async def run_until_stopped(
@@ -398,7 +471,7 @@ class AsyncScheduler:
self._state = RunState.starting
async with AsyncExitStack() as exit_stack:
self._wakeup_event = anyio.Event()
- await self._ensure_services_ready(exit_stack)
+ await self._ensure_services_initialized(exit_stack)
# Wake up the scheduler if the data store emits a significant schedule event
exit_stack.enter_context(
@@ -407,14 +480,16 @@ class AsyncScheduler:
)
)
+ # Set this scheduler as the current scheduler
+ token = current_async_scheduler.set(self)
+ exit_stack.callback(current_async_scheduler.reset, token)
+
# Start the built-in worker, if configured to do so
- if self.start_worker:
- token = current_scheduler.set(self)
- exit_stack.callback(current_scheduler.reset, token)
- worker = AsyncWorker(
- self.data_store, self.event_broker, is_internal=True
+ if self.process_jobs:
+ worker = Worker(job_executors=self.job_executors)
+ await worker.start(
+ exit_stack, self.data_store, self.event_broker, self.identity
)
- await exit_stack.enter_async_context(worker)
# Signal that the scheduler has started
self._state = RunState.started
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index d98161a..3a812a4 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -1,77 +1,100 @@
from __future__ import annotations
import atexit
-import os
-import platform
-import random
+import logging
import sys
import threading
-from concurrent.futures import Future
+from collections.abc import MutableMapping
from contextlib import ExitStack
-from datetime import datetime, timedelta, timezone
-from logging import Logger, getLogger
+from datetime import timedelta
+from functools import partial
+from logging import Logger
from types import TracebackType
-from typing import Any, Callable, Iterable, Mapping, cast
-from uuid import UUID, uuid4
-
-import attrs
-
-from .._context import current_scheduler
-from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
-from .._events import (
- Event,
- JobReleased,
- ScheduleAdded,
- SchedulerStarted,
- SchedulerStopped,
- ScheduleUpdated,
-)
-from .._exceptions import (
- JobCancelled,
- JobDeadlineMissed,
- JobLookupError,
- ScheduleLookupError,
-)
-from .._structures import Job, JobResult, Schedule, Task
-from ..abc import DataStore, EventBroker, Trigger
-from ..datastores.memory import MemoryDataStore
-from ..eventbrokers.local import LocalEventBroker
-from ..marshalling import callable_to_ref
-from ..workers.sync import Worker
+from typing import Any, Callable, Iterable, Mapping
+from uuid import UUID
+
+from anyio import start_blocking_portal
+from anyio.from_thread import BlockingPortal
+
+from .. import Event, current_scheduler
+from .._enums import CoalescePolicy, ConflictPolicy, RunState
+from .._structures import JobResult, Schedule
+from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
+from .async_ import AsyncScheduler
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
-_microsecond_delta = timedelta(microseconds=1)
-_zero_timedelta = timedelta()
-
-@attrs.define(eq=False)
class Scheduler:
"""A synchronous scheduler implementation."""
- data_store: DataStore = attrs.field(factory=MemoryDataStore)
- event_broker: EventBroker = attrs.field(factory=LocalEventBroker)
- identity: str = attrs.field(kw_only=True, default=None)
- start_worker: bool = attrs.field(kw_only=True, default=True)
- logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
+ def __init__(
+ self,
+ data_store: DataStore | None = None,
+ event_broker: EventBroker | None = None,
+ *,
+ identity: str | None = None,
+ process_schedules: bool = True,
+ start_worker: bool = True,
+ job_executors: Mapping[str, JobExecutor] | None = None,
+ default_job_executor: str | None = None,
+ logger: Logger | None = None,
+ ):
+ kwargs: dict[str, Any] = {}
+ if data_store is not None:
+ kwargs["data_store"] = data_store
+ if event_broker is not None:
+ kwargs["event_broker"] = event_broker
+
+ if not default_job_executor and not job_executors:
+ default_job_executor = "threadpool"
+
+ self._async_scheduler = AsyncScheduler(
+ identity=identity,
+ process_schedules=process_schedules,
+ process_jobs=start_worker,
+ job_executors=job_executors,
+ default_job_executor=default_job_executor,
+ logger=logger or logging.getLogger(__name__),
+ **kwargs,
+ )
+ self._exit_stack = ExitStack()
+ self._portal: BlockingPortal | None = None
+ self._lock = threading.RLock()
- _state: RunState = attrs.field(init=False, default=RunState.stopped)
- _thread: threading.Thread | None = attrs.field(init=False, default=None)
- _wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event)
- _wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
- _services_initialized: bool = attrs.field(init=False, default=False)
- _exit_stack: ExitStack | None = attrs.field(init=False, default=None)
- _lock: threading.RLock = attrs.field(init=False, factory=threading.RLock)
+ @property
+ def data_store(self) -> DataStore:
+ return self._async_scheduler.data_store
- def __attrs_post_init__(self) -> None:
- if not self.identity:
- self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
+ @property
+ def event_broker(self) -> EventBroker:
+ return self._async_scheduler.event_broker
+
+ @property
+ def identity(self) -> str:
+ return self._async_scheduler.identity
+
+ @property
+ def process_schedules(self) -> bool:
+ return self._async_scheduler.process_schedules
+
+ @property
+ def start_worker(self) -> bool:
+ return self._async_scheduler.process_jobs
+
+ @property
+ def job_executors(self) -> MutableMapping[str, JobExecutor]:
+ return self._async_scheduler.job_executors
+
+ @property
+ def state(self) -> RunState:
+ """The current running state of the scheduler."""
+ return self._async_scheduler.state
def __enter__(self: Self) -> Self:
- self._exit_stack = ExitStack()
self._ensure_services_ready(self._exit_stack)
return self
@@ -81,13 +104,12 @@ class Scheduler:
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
- self.stop()
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None:
- """Ensure that the data store and event broker have been initialized."""
+ """Ensure that the underlying asynchronous scheduler has been initialized."""
with self._lock:
- if not self._services_initialized:
+ if self._portal is None:
if exit_stack is None:
if self._exit_stack is None:
self._exit_stack = exit_stack = ExitStack()
@@ -95,43 +117,39 @@ class Scheduler:
else:
exit_stack = self._exit_stack
- self._services_initialized = True
- exit_stack.callback(setattr, self, "_services_initialized", False)
+ # Set this scheduler as the current synchronous scheduler
+ token = current_scheduler.set(self)
+ exit_stack.callback(current_scheduler.reset, token)
- self.event_broker.start()
- exit_stack.push(
- lambda *exc_info: self.event_broker.stop(
- force=exc_info[0] is not None
- )
+ self._portal = exit_stack.enter_context(start_blocking_portal())
+ exit_stack.callback(setattr, self, "_portal", None)
+ exit_stack.enter_context(
+ self._portal.wrap_async_context_manager(self._async_scheduler)
)
- # Initialize the data store
- self.data_store.start(self.event_broker)
- exit_stack.push(
- lambda *exc_info: self.data_store.stop(
- force=exc_info[0] is not None
- )
- )
+ def subscribe(
+ self,
+ callback: Callable[[Event], Any],
+ event_types: Iterable[type[Event]] | None = None,
+ *,
+ one_shot: bool = False,
+ ) -> Subscription:
+ """
+ Subscribe to events.
- def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast("ScheduleAdded | ScheduleUpdated", event)
- if not self._wakeup_deadline or (
- event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
- ):
- self.logger.debug(
- "Detected a %s event – waking up the scheduler", type(event).__name__
- )
- self._wakeup_event.set()
+ To unsubscribe, call the :meth:`Subscription.unsubscribe` method on the returned
+ object.
- def _join_thread(self) -> None:
- if self._thread:
- self._thread.join()
- self._thread = None
+ :param callback: callable to be called with the event object when an event is
+ published
+ :param event_types: an iterable of concrete Event classes to subscribe to
+ :param one_shot: if ``True``, automatically unsubscribe after the first matching
+ event
- @property
- def state(self) -> RunState:
- """The current running state of the scheduler."""
- return self._state
+ """
+ return self.data_store.event_broker.subscribe(
+ callback, event_types, is_async=False, one_shot=one_shot
+ )
def add_schedule(
self,
@@ -141,104 +159,42 @@ class Scheduler:
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
max_jitter: float | timedelta | None = None,
tags: Iterable[str] | None = None,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
- """
- Schedule a task to be run one or more times in the future.
-
- :param func_or_task_id: either a callable or an ID of an existing task
- definition
- :param trigger: determines the times when the task should be run
- :param id: an explicit identifier for the schedule (if omitted, a random, UUID
- based ID will be assigned)
- :param args: positional arguments to be passed to the task function
- :param kwargs: keyword arguments to be passed to the task function
- :param coalesce: determines what to do when processing the schedule if multiple
- fire times have become due for this schedule since the last processing
- :param misfire_grace_time: maximum number of seconds the scheduled job's actual
- run time is allowed to be late, compared to the scheduled run time
- :param max_jitter: maximum number of seconds to randomly add to the scheduled
- time for each job created from this schedule
- :param tags: strings that can be used to categorize and filter the schedule and
- its derivative jobs
- :param conflict_policy: determines what to do if a schedule with the same ID
- already exists in the data store
- :return: the ID of the newly added schedule
-
- """
self._ensure_services_ready()
- id = id or str(uuid4())
- args = tuple(args or ())
- kwargs = dict(kwargs or {})
- tags = frozenset(tags or ())
- if isinstance(misfire_grace_time, (int, float)):
- misfire_grace_time = timedelta(seconds=misfire_grace_time)
-
- if callable(func_or_task_id):
- task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
- self.data_store.add_task(task)
- else:
- task = self.data_store.get_task(func_or_task_id)
-
- schedule = Schedule(
- id=id,
- task_id=task.id,
- trigger=trigger,
- args=args,
- kwargs=kwargs,
- coalesce=coalesce,
- misfire_grace_time=misfire_grace_time,
- max_jitter=max_jitter,
- tags=tags,
- )
- schedule.next_fire_time = trigger.next()
- self.data_store.add_schedule(schedule, conflict_policy)
- self.logger.info(
- "Added new schedule (task=%r, trigger=%r); next run time at %s",
- task,
- trigger,
- schedule.next_fire_time,
+ return self._portal.call(
+ partial(
+ self._async_scheduler.add_schedule,
+ func_or_task_id,
+ trigger,
+ id=id,
+ args=args,
+ kwargs=kwargs,
+ job_executor=job_executor,
+ coalesce=coalesce,
+ misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter,
+ tags=tags,
+ conflict_policy=conflict_policy,
+ )
)
- return schedule.id
def get_schedule(self, id: str) -> Schedule:
- """
- Retrieve a schedule from the data store.
-
- :param id: the unique identifier of the schedule
- :raises ScheduleLookupError: if the schedule could not be found
-
- """
self._ensure_services_ready()
- schedules = self.data_store.get_schedules({id})
- if schedules:
- return schedules[0]
- else:
- raise ScheduleLookupError(id)
+ return self._portal.call(self._async_scheduler.get_schedule, id)
def get_schedules(self) -> list[Schedule]:
- """
- Retrieve all schedules from the data store.
-
- :return: a list of schedules, in an unspecified order
-
- """
self._ensure_services_ready()
- return self.data_store.get_schedules()
+ return self._portal.call(self._async_scheduler.get_schedules)
def remove_schedule(self, id: str) -> None:
- """
- Remove the given schedule from the data store.
-
- :param id: the unique identifier of the schedule
-
- """
self._ensure_services_ready()
- self.data_store.remove_schedules({id})
+ self._portal.call(self._async_scheduler.remove_schedule, id)
def add_job(
self,
@@ -246,68 +202,28 @@ class Scheduler:
*,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
tags: Iterable[str] | None = None,
result_expiration_time: timedelta | float = 0,
) -> UUID:
- """
- Add a job to the data store.
-
- :param func_or_task_id: either a callable or an ID of an existing task
- definition
- :param args: positional arguments to be passed to the task function
- :param kwargs: keyword arguments to be passed to the task function
- :param tags: strings that can be used to categorize and filter the job
- :param result_expiration_time: the minimum time (as seconds, or timedelta) to
- keep the result of the job available for fetching (the result won't be
- saved at all if that time is 0)
- :return: the ID of the newly created job
-
- """
self._ensure_services_ready()
- if callable(func_or_task_id):
- task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
- self.data_store.add_task(task)
- else:
- task = self.data_store.get_task(func_or_task_id)
-
- job = Job(
- task_id=task.id,
- args=args or (),
- kwargs=kwargs or {},
- tags=tags or frozenset(),
- result_expiration_time=result_expiration_time,
+ return self._portal.call(
+ partial(
+ self._async_scheduler.add_job,
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ job_executor=job_executor,
+ tags=tags,
+ result_expiration_time=result_expiration_time,
+ )
)
- self.data_store.add_job(job)
- return job.id
def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult:
- """
- Retrieve the result of a job.
-
- :param job_id: the ID of the job
- :param wait: if ``True``, wait until the job has ended (one way or another),
- ``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if ``wait=False`` and the job result does not exist in
- the data store
-
- """
self._ensure_services_ready()
- wait_event = threading.Event()
-
- def listener(event: JobReleased) -> None:
- if event.job_id == job_id:
- wait_event.set()
-
- with self.data_store.events.subscribe(listener, {JobReleased}, one_shot=True):
- result = self.data_store.get_job_result(job_id)
- if result:
- return result
- elif not wait:
- raise JobLookupError(job_id)
-
- wait_event.wait()
-
- return self.data_store.get_job_result(job_id)
+ return self._portal.call(
+ partial(self._async_scheduler.get_job_result, job_id, wait=wait)
+ )
def run_job(
self,
@@ -315,50 +231,20 @@ class Scheduler:
*,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
+ job_executor: str | None = None,
tags: Iterable[str] | None = (),
) -> Any:
- """
- Convenience method to add a job and then return its result.
-
- If the job raised an exception, that exception will be reraised here.
-
- :param func_or_task_id: either a callable or an ID of an existing task
- definition
- :param args: positional arguments to be passed to the task function
- :param kwargs: keyword arguments to be passed to the task function
- :param tags: strings that can be used to categorize and filter the job
- :returns: the return value of the task function
-
- """
self._ensure_services_ready()
- job_complete_event = threading.Event()
-
- def listener(event: JobReleased) -> None:
- if event.job_id == job_id:
- job_complete_event.set()
-
- job_id: UUID | None = None
- with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = self.add_job(
+ return self._portal.call(
+ partial(
+ self._async_scheduler.run_job,
func_or_task_id,
args=args,
kwargs=kwargs,
+ job_executor=job_executor,
tags=tags,
- result_expiration_time=timedelta(minutes=15),
)
- job_complete_event.wait()
-
- result = self.get_job_result(job_id)
- if result.outcome is JobOutcome.success:
- return result.return_value
- elif result.outcome is JobOutcome.error:
- raise result.exception
- elif result.outcome is JobOutcome.missed_start_deadline:
- raise JobDeadlineMissed
- elif result.outcome is JobOutcome.cancelled:
- raise JobCancelled
- else:
- raise RuntimeError(f"Unknown job outcome: {result.outcome}")
+ )
def start_in_background(self) -> None:
"""
@@ -370,241 +256,19 @@ class Scheduler:
:raises RuntimeError: if the scheduler is not in the ``stopped`` state
"""
- with self._lock:
- if self._state is not RunState.stopped:
- raise RuntimeError(
- f'Cannot start the scheduler when it is in the "{self._state}" '
- f"state"
- )
-
- self._state = RunState.starting
-
- start_future: Future[None] = Future()
- self._thread = threading.Thread(
- target=self._run, args=[start_future], daemon=True
- )
- self._thread.start()
- try:
- start_future.result()
- except BaseException:
- self._thread = None
- raise
-
- self._exit_stack.callback(self._join_thread)
- self._exit_stack.callback(self.stop)
+ self._ensure_services_ready()
+ self._portal.call(self._async_scheduler.start_in_background)
def stop(self) -> None:
- """
- Signal the scheduler that it should stop processing schedules.
-
- This method does not wait for the scheduler to actually stop.
- For that, see :meth:`wait_until_stopped`.
-
- """
- with self._lock:
- if self._state is RunState.started:
- self._state = RunState.stopping
- self._wakeup_event.set()
+ if self._portal is not None:
+ self._portal.call(self._async_scheduler.stop)
def wait_until_stopped(self) -> None:
- """
- Wait until the scheduler is in the "stopped" or "stopping" state.
-
- If the scheduler is already stopped or in the process of stopping, this method
- returns immediately. Otherwise, it waits until the scheduler posts the
- ``SchedulerStopped`` event.
-
- """
- with self._lock:
- if self._state in (RunState.stopped, RunState.stopping):
- return
-
- event = threading.Event()
- sub = self.event_broker.subscribe(
- lambda ev: event.set(), {SchedulerStopped}, one_shot=True
- )
-
- with sub:
- event.wait()
+ if self._portal is not None:
+ self._portal.call(self._async_scheduler.wait_until_stopped)
def run_until_stopped(self) -> None:
- """
- Run the scheduler (and its internal worker) until it is explicitly stopped.
-
- This method will only return if :meth:`stop` is called.
-
- """
- with self._lock:
- if self._state is not RunState.stopped:
- raise RuntimeError(
- f'Cannot start the scheduler when it is in the "{self._state}" '
- f"state"
- )
-
- self._state = RunState.starting
-
- self._run(None)
-
- def _run(self, start_future: Future[None] | None) -> None:
- assert self._state is RunState.starting
- with self._exit_stack.pop_all() as exit_stack:
- try:
- self._ensure_services_ready(exit_stack)
-
- # Wake up the scheduler if the data store emits a significant schedule
- # event
- exit_stack.enter_context(
- self.data_store.events.subscribe(
- self._schedule_added_or_modified,
- {ScheduleAdded, ScheduleUpdated},
- )
- )
-
- # Start the built-in worker, if configured to do so
- if self.start_worker:
- token = current_scheduler.set(self)
- exit_stack.callback(current_scheduler.reset, token)
- worker = Worker(
- self.data_store, self.event_broker, is_internal=True
- )
- exit_stack.enter_context(worker)
-
- # Signal that the scheduler has started
- self._state = RunState.started
- self.event_broker.publish_local(SchedulerStarted())
- except BaseException as exc:
- if start_future:
- start_future.set_exception(exc)
- return
- else:
- raise
- else:
- if start_future:
- start_future.set_result(None)
-
- exception: BaseException | None = None
- try:
- while self._state is RunState.started:
- schedules = self.data_store.acquire_schedules(self.identity, 100)
- self.logger.debug(
- "Processing %d schedules retrieved from the data store",
- len(schedules),
- )
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
- try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- "Error computing next fire time for schedule %r of "
- "task %r – removing schedule",
- schedule.id,
- schedule.task_id,
- )
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- max_jitter = (
- schedule.max_jitter.total_seconds()
- if schedule.max_jitter
- else 0
- )
- for i, fire_time in enumerate(fire_times):
- # Calculate a jitter if max_jitter > 0
- jitter = _zero_timedelta
- if max_jitter:
- if i + 1 < len(fire_times):
- next_fire_time = fire_times[i + 1]
- else:
- next_fire_time = schedule.next_fire_time
-
- if next_fire_time is not None:
- # Jitter must never be so high that it would cause
- # a fire time to equal or exceed the next fire time
- jitter_s = min(
- [
- max_jitter,
- (
- next_fire_time
- - fire_time
- - _microsecond_delta
- ).total_seconds(),
- ]
- )
- jitter = timedelta(
- seconds=random.uniform(0, jitter_s)
- )
- fire_time += jitter
-
- schedule.last_fire_time = fire_time
- job = Job(
- task_id=schedule.task_id,
- args=schedule.args,
- kwargs=schedule.kwargs,
- schedule_id=schedule.id,
- scheduled_fire_time=fire_time,
- jitter=jitter,
- start_deadline=schedule.next_deadline,
- tags=schedule.tags,
- )
- self.data_store.add_job(job)
-
- # Update the schedules (and release the scheduler's claim on them)
- self.data_store.release_schedules(self.identity, schedules)
-
- # If we received fewer schedules than the maximum amount, sleep
- # until the next schedule is due or the scheduler is explicitly
- # woken up
- wait_time = None
- if len(schedules) < 100:
- self._wakeup_deadline = (
- self.data_store.get_next_schedule_run_time()
- )
- if self._wakeup_deadline:
- wait_time = (
- self._wakeup_deadline - datetime.now(timezone.utc)
- ).total_seconds()
- self.logger.debug(
- "Sleeping %.3f seconds until the next fire time (%s)",
- wait_time,
- self._wakeup_deadline,
- )
- else:
- self.logger.debug("Waiting for any due schedules to appear")
-
- if self._wakeup_event.wait(wait_time):
- self._wakeup_event = threading.Event()
- else:
- self.logger.debug(
- "Processing more schedules on the next iteration"
- )
- except BaseException as exc:
- exception = exc
- raise
- finally:
- self._state = RunState.stopped
- if isinstance(exception, Exception):
- self.logger.exception("Scheduler crashed")
- elif exception:
- self.logger.info(
- f"Scheduler stopped due to {exception.__class__.__name__}"
- )
- else:
- self.logger.info("Scheduler stopped")
-
- self.event_broker.publish_local(SchedulerStopped(exception=exception))
+ with ExitStack() as exit_stack:
+ # Run the async scheduler
+ self._ensure_services_ready(exit_stack)
+ self._portal.call(self._async_scheduler.run_until_stopped)