diff options
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 165 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 644 |
2 files changed, 274 insertions, 535 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 75972b5..44fee27 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -5,6 +5,7 @@ import platform import random import sys from asyncio import CancelledError +from collections.abc import MutableMapping from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger @@ -16,9 +17,9 @@ import anyio import attrs from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after from anyio.abc import TaskGroup, TaskStatus +from attr.validators import instance_of -from .._context import current_scheduler -from .._converters import as_async_datastore, as_async_eventbroker +from .._context import current_async_scheduler from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from .._events import ( Event, @@ -35,11 +36,14 @@ from .._exceptions import ( ScheduleLookupError, ) from .._structures import Job, JobResult, Schedule, Task -from ..abc import AsyncDataStore, AsyncEventBroker, Subscription, Trigger +from .._worker import Worker +from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger from ..datastores.memory import MemoryDataStore -from ..eventbrokers.async_local import LocalAsyncEventBroker +from ..eventbrokers.local import LocalEventBroker +from ..executors.async_ import AsyncJobExecutor +from ..executors.subprocess import ProcessPoolJobExecutor +from ..executors.thread import ThreadPoolJobExecutor from ..marshalling import callable_to_ref -from ..workers.async_ import AsyncWorker if sys.version_info >= (3, 11): from typing import Self @@ -54,19 +58,24 @@ _zero_timedelta = timedelta() class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" - data_store: AsyncDataStore = attrs.field( - converter=as_async_datastore, factory=MemoryDataStore + data_store: DataStore = attrs.field( + validator=instance_of(DataStore), factory=MemoryDataStore ) - event_broker: AsyncEventBroker = attrs.field( - converter=as_async_eventbroker, factory=LocalAsyncEventBroker + event_broker: EventBroker = attrs.field( + validator=instance_of(EventBroker), factory=LocalEventBroker ) identity: str = attrs.field(kw_only=True, default=None) - start_worker: bool = attrs.field(kw_only=True, default=True) + process_jobs: bool = attrs.field(kw_only=True, default=True) + job_executors: MutableMapping[str, JobExecutor] | None = attrs.field( + kw_only=True, default=None + ) + default_job_executor: str | None = attrs.field(kw_only=True, default=None) + process_schedules: bool = attrs.field(kw_only=True, default=True) logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) _state: RunState = attrs.field(init=False, default=RunState.stopped) _task_group: TaskGroup | None = attrs.field(init=False, default=None) - _exit_stack: AsyncExitStack | None = attrs.field(init=False, default=None) + _exit_stack: AsyncExitStack = attrs.field(init=False, factory=AsyncExitStack) _services_initialized: bool = attrs.field(init=False, default=False) _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) @@ -76,13 +85,32 @@ class AsyncScheduler: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" + if not self.job_executors: + self.job_executors = { + "async": AsyncJobExecutor(), + "threadpool": ThreadPoolJobExecutor(), + "processpool": ProcessPoolJobExecutor(), + } + + if not self.default_job_executor: + self.default_job_executor = next(iter(self.job_executors)) + elif self.default_job_executor not in self.job_executors: + raise ValueError( + "default_job_executor must be one of the given job executors" + ) + async def __aenter__(self: Self) -> Self: - self._exit_stack = AsyncExitStack() await self._exit_stack.__aenter__() - await self._ensure_services_ready(self._exit_stack) - self._task_group = await self._exit_stack.enter_async_context( - create_task_group() - ) + try: + await self._ensure_services_initialized(self._exit_stack) + self._task_group = await self._exit_stack.enter_async_context( + create_task_group() + ) + self._exit_stack.callback(setattr, self, "_task_group", None) + except BaseException as exc: + await self._exit_stack.__aexit__(type(exc), exc, exc.__traceback__) + raise + return self async def __aexit__( @@ -93,27 +121,25 @@ class AsyncScheduler: ) -> None: await self.stop() await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) - self._task_group = None - async def _ensure_services_ready(self, exit_stack: AsyncExitStack) -> None: + async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None: """Ensure that the data store and event broker have been initialized.""" if not self._services_initialized: self._services_initialized = True exit_stack.callback(setattr, self, "_services_initialized", False) - # Initialize the event broker - await self.event_broker.start() - exit_stack.push_async_exit( - lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None) - ) + await self.event_broker.start(exit_stack) + await self.data_store.start(exit_stack, self.event_broker) - # Initialize the data store - await self.data_store.start(self.event_broker) - exit_stack.push_async_exit( - lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None) + def _check_initialized(self) -> None: + if not self._services_initialized: + raise RuntimeError( + "The scheduler has not been initialized yet. Use the scheduler as an " + "async context manager (async with ...) in order to call methods other " + "than run_until_complete()." ) - def _schedule_added_or_modified(self, event: Event) -> None: + async def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) if not self._wakeup_deadline or ( event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline @@ -128,6 +154,35 @@ class AsyncScheduler: """The current running state of the scheduler.""" return self._state + def subscribe( + self, + callback: Callable[[Event], Any], + event_types: Iterable[type[Event]] | None = None, + *, + one_shot: bool = False, + is_async: bool = True, + ) -> Subscription: + """ + Subscribe to events. + + To unsubscribe, call the :meth:`Subscription.unsubscribe` method on the returned + object. + + :param callback: callable to be called with the event object when an event is + published + :param event_types: an iterable of concrete Event classes to subscribe to + :param one_shot: if ``True``, automatically unsubscribe after the first matching + event + :param is_async: ``True`` if the (synchronous) callback should be called on the + event loop thread, ``False`` if it should be called in a worker thread. + If the callback is a coroutine function, this flag is ignored. + + """ + self._check_initialized() + return self.event_broker.subscribe( + callback, event_types, is_async=is_async, one_shot=one_shot + ) + async def add_schedule( self, func_or_task_id: str | Callable, @@ -136,6 +191,7 @@ class AsyncScheduler: id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None = None, max_jitter: float | timedelta | None = None, @@ -152,6 +208,7 @@ class AsyncScheduler: based ID will be assigned) :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function + :param job_executor: name of the job executor to run the task with :param coalesce: determines what to do when processing the schedule if multiple fire times have become due for this schedule since the last processing :param misfire_grace_time: maximum number of seconds the scheduled job's actual @@ -165,6 +222,7 @@ class AsyncScheduler: :return: the ID of the newly added schedule """ + self._check_initialized() id = id or str(uuid4()) args = tuple(args or ()) kwargs = dict(kwargs or {}) @@ -173,7 +231,11 @@ class AsyncScheduler: misfire_grace_time = timedelta(seconds=misfire_grace_time) if callable(func_or_task_id): - task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) + task = Task( + id=callable_to_ref(func_or_task_id), + func=func_or_task_id, + executor=job_executor or self.default_job_executor, + ) await self.data_store.add_task(task) else: task = await self.data_store.get_task(func_or_task_id) @@ -207,6 +269,7 @@ class AsyncScheduler: :raises ScheduleLookupError: if the schedule could not be found """ + self._check_initialized() schedules = await self.data_store.get_schedules({id}) if schedules: return schedules[0] @@ -220,6 +283,7 @@ class AsyncScheduler: :return: a list of schedules, in an unspecified order """ + self._check_initialized() return await self.data_store.get_schedules() async def remove_schedule(self, id: str) -> None: @@ -229,6 +293,7 @@ class AsyncScheduler: :param id: the unique identifier of the schedule """ + self._check_initialized() await self.data_store.remove_schedules({id}) async def add_job( @@ -237,6 +302,7 @@ class AsyncScheduler: *, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, tags: Iterable[str] | None = None, result_expiration_time: timedelta | float = 0, ) -> UUID: @@ -244,8 +310,10 @@ class AsyncScheduler: Add a job to the data store. :param func_or_task_id: + :param job_executor: name of the job executor to run the task with :param args: positional arguments to call the target callable with :param kwargs: keyword arguments to call the target callable with + :param job_executor: name of the job executor to run the task with :param tags: strings that can be used to categorize and filter the job :param result_expiration_time: the minimum time (as seconds, or timedelta) to keep the result of the job available for fetching (the result won't be @@ -253,8 +321,13 @@ class AsyncScheduler: :return: the ID of the newly created job """ + self._check_initialized() if callable(func_or_task_id): - task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) + task = Task( + id=callable_to_ref(func_or_task_id), + func=func_or_task_id, + executor=job_executor or self.default_job_executor, + ) await self.data_store.add_task(task) else: task = await self.data_store.get_task(func_or_task_id) @@ -280,13 +353,14 @@ class AsyncScheduler: the data store """ + self._check_initialized() wait_event = anyio.Event() def listener(event: JobReleased) -> None: if event.job_id == job_id: wait_event.set() - with self.data_store.events.subscribe(listener, {JobReleased}): + with self.event_broker.subscribe(listener, {JobReleased}): result = await self.data_store.get_job_result(job_id) if result: return result @@ -303,6 +377,7 @@ class AsyncScheduler: *, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, tags: Iterable[str] | None = (), ) -> Any: """ @@ -314,10 +389,12 @@ class AsyncScheduler: definition :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function + :param job_executor: name of the job executor to run the task with :param tags: strings that can be used to categorize and filter the job :returns: the return value of the task function """ + self._check_initialized() job_complete_event = anyio.Event() def listener(event: JobReleased) -> None: @@ -325,11 +402,12 @@ class AsyncScheduler: job_complete_event.set() job_id: UUID | None = None - with self.data_store.events.subscribe(listener, {JobReleased}): + with self.event_broker.subscribe(listener, {JobReleased}): job_id = await self.add_job( func_or_task_id, args=args, kwargs=kwargs, + job_executor=job_executor, tags=tags, result_expiration_time=timedelta(minutes=15), ) @@ -378,12 +456,7 @@ class AsyncScheduler: await event.wait() async def start_in_background(self) -> None: - if self._task_group is None: - raise RuntimeError( - "The scheduler must be used as an async context manager (async with " - "...) in order to be startable in the background" - ) - + self._check_initialized() await self._task_group.start(self.run_until_stopped) async def run_until_stopped( @@ -398,7 +471,7 @@ class AsyncScheduler: self._state = RunState.starting async with AsyncExitStack() as exit_stack: self._wakeup_event = anyio.Event() - await self._ensure_services_ready(exit_stack) + await self._ensure_services_initialized(exit_stack) # Wake up the scheduler if the data store emits a significant schedule event exit_stack.enter_context( @@ -407,14 +480,16 @@ class AsyncScheduler: ) ) + # Set this scheduler as the current scheduler + token = current_async_scheduler.set(self) + exit_stack.callback(current_async_scheduler.reset, token) + # Start the built-in worker, if configured to do so - if self.start_worker: - token = current_scheduler.set(self) - exit_stack.callback(current_scheduler.reset, token) - worker = AsyncWorker( - self.data_store, self.event_broker, is_internal=True + if self.process_jobs: + worker = Worker(job_executors=self.job_executors) + await worker.start( + exit_stack, self.data_store, self.event_broker, self.identity ) - await exit_stack.enter_async_context(worker) # Signal that the scheduler has started self._state = RunState.started diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index d98161a..3a812a4 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -1,77 +1,100 @@ from __future__ import annotations import atexit -import os -import platform -import random +import logging import sys import threading -from concurrent.futures import Future +from collections.abc import MutableMapping from contextlib import ExitStack -from datetime import datetime, timedelta, timezone -from logging import Logger, getLogger +from datetime import timedelta +from functools import partial +from logging import Logger from types import TracebackType -from typing import Any, Callable, Iterable, Mapping, cast -from uuid import UUID, uuid4 - -import attrs - -from .._context import current_scheduler -from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState -from .._events import ( - Event, - JobReleased, - ScheduleAdded, - SchedulerStarted, - SchedulerStopped, - ScheduleUpdated, -) -from .._exceptions import ( - JobCancelled, - JobDeadlineMissed, - JobLookupError, - ScheduleLookupError, -) -from .._structures import Job, JobResult, Schedule, Task -from ..abc import DataStore, EventBroker, Trigger -from ..datastores.memory import MemoryDataStore -from ..eventbrokers.local import LocalEventBroker -from ..marshalling import callable_to_ref -from ..workers.sync import Worker +from typing import Any, Callable, Iterable, Mapping +from uuid import UUID + +from anyio import start_blocking_portal +from anyio.from_thread import BlockingPortal + +from .. import Event, current_scheduler +from .._enums import CoalescePolicy, ConflictPolicy, RunState +from .._structures import JobResult, Schedule +from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger +from .async_ import AsyncScheduler if sys.version_info >= (3, 11): from typing import Self else: from typing_extensions import Self -_microsecond_delta = timedelta(microseconds=1) -_zero_timedelta = timedelta() - -@attrs.define(eq=False) class Scheduler: """A synchronous scheduler implementation.""" - data_store: DataStore = attrs.field(factory=MemoryDataStore) - event_broker: EventBroker = attrs.field(factory=LocalEventBroker) - identity: str = attrs.field(kw_only=True, default=None) - start_worker: bool = attrs.field(kw_only=True, default=True) - logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) + def __init__( + self, + data_store: DataStore | None = None, + event_broker: EventBroker | None = None, + *, + identity: str | None = None, + process_schedules: bool = True, + start_worker: bool = True, + job_executors: Mapping[str, JobExecutor] | None = None, + default_job_executor: str | None = None, + logger: Logger | None = None, + ): + kwargs: dict[str, Any] = {} + if data_store is not None: + kwargs["data_store"] = data_store + if event_broker is not None: + kwargs["event_broker"] = event_broker + + if not default_job_executor and not job_executors: + default_job_executor = "threadpool" + + self._async_scheduler = AsyncScheduler( + identity=identity, + process_schedules=process_schedules, + process_jobs=start_worker, + job_executors=job_executors, + default_job_executor=default_job_executor, + logger=logger or logging.getLogger(__name__), + **kwargs, + ) + self._exit_stack = ExitStack() + self._portal: BlockingPortal | None = None + self._lock = threading.RLock() - _state: RunState = attrs.field(init=False, default=RunState.stopped) - _thread: threading.Thread | None = attrs.field(init=False, default=None) - _wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event) - _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) - _services_initialized: bool = attrs.field(init=False, default=False) - _exit_stack: ExitStack | None = attrs.field(init=False, default=None) - _lock: threading.RLock = attrs.field(init=False, factory=threading.RLock) + @property + def data_store(self) -> DataStore: + return self._async_scheduler.data_store - def __attrs_post_init__(self) -> None: - if not self.identity: - self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" + @property + def event_broker(self) -> EventBroker: + return self._async_scheduler.event_broker + + @property + def identity(self) -> str: + return self._async_scheduler.identity + + @property + def process_schedules(self) -> bool: + return self._async_scheduler.process_schedules + + @property + def start_worker(self) -> bool: + return self._async_scheduler.process_jobs + + @property + def job_executors(self) -> MutableMapping[str, JobExecutor]: + return self._async_scheduler.job_executors + + @property + def state(self) -> RunState: + """The current running state of the scheduler.""" + return self._async_scheduler.state def __enter__(self: Self) -> Self: - self._exit_stack = ExitStack() self._ensure_services_ready(self._exit_stack) return self @@ -81,13 +104,12 @@ class Scheduler: exc_val: BaseException, exc_tb: TracebackType, ) -> None: - self.stop() self._exit_stack.__exit__(exc_type, exc_val, exc_tb) def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None: - """Ensure that the data store and event broker have been initialized.""" + """Ensure that the underlying asynchronous scheduler has been initialized.""" with self._lock: - if not self._services_initialized: + if self._portal is None: if exit_stack is None: if self._exit_stack is None: self._exit_stack = exit_stack = ExitStack() @@ -95,43 +117,39 @@ class Scheduler: else: exit_stack = self._exit_stack - self._services_initialized = True - exit_stack.callback(setattr, self, "_services_initialized", False) + # Set this scheduler as the current synchronous scheduler + token = current_scheduler.set(self) + exit_stack.callback(current_scheduler.reset, token) - self.event_broker.start() - exit_stack.push( - lambda *exc_info: self.event_broker.stop( - force=exc_info[0] is not None - ) + self._portal = exit_stack.enter_context(start_blocking_portal()) + exit_stack.callback(setattr, self, "_portal", None) + exit_stack.enter_context( + self._portal.wrap_async_context_manager(self._async_scheduler) ) - # Initialize the data store - self.data_store.start(self.event_broker) - exit_stack.push( - lambda *exc_info: self.data_store.stop( - force=exc_info[0] is not None - ) - ) + def subscribe( + self, + callback: Callable[[Event], Any], + event_types: Iterable[type[Event]] | None = None, + *, + one_shot: bool = False, + ) -> Subscription: + """ + Subscribe to events. - def _schedule_added_or_modified(self, event: Event) -> None: - event_ = cast("ScheduleAdded | ScheduleUpdated", event) - if not self._wakeup_deadline or ( - event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline - ): - self.logger.debug( - "Detected a %s event – waking up the scheduler", type(event).__name__ - ) - self._wakeup_event.set() + To unsubscribe, call the :meth:`Subscription.unsubscribe` method on the returned + object. - def _join_thread(self) -> None: - if self._thread: - self._thread.join() - self._thread = None + :param callback: callable to be called with the event object when an event is + published + :param event_types: an iterable of concrete Event classes to subscribe to + :param one_shot: if ``True``, automatically unsubscribe after the first matching + event - @property - def state(self) -> RunState: - """The current running state of the scheduler.""" - return self._state + """ + return self.data_store.event_broker.subscribe( + callback, event_types, is_async=False, one_shot=one_shot + ) def add_schedule( self, @@ -141,104 +159,42 @@ class Scheduler: id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None = None, max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, ) -> str: - """ - Schedule a task to be run one or more times in the future. - - :param func_or_task_id: either a callable or an ID of an existing task - definition - :param trigger: determines the times when the task should be run - :param id: an explicit identifier for the schedule (if omitted, a random, UUID - based ID will be assigned) - :param args: positional arguments to be passed to the task function - :param kwargs: keyword arguments to be passed to the task function - :param coalesce: determines what to do when processing the schedule if multiple - fire times have become due for this schedule since the last processing - :param misfire_grace_time: maximum number of seconds the scheduled job's actual - run time is allowed to be late, compared to the scheduled run time - :param max_jitter: maximum number of seconds to randomly add to the scheduled - time for each job created from this schedule - :param tags: strings that can be used to categorize and filter the schedule and - its derivative jobs - :param conflict_policy: determines what to do if a schedule with the same ID - already exists in the data store - :return: the ID of the newly added schedule - - """ self._ensure_services_ready() - id = id or str(uuid4()) - args = tuple(args or ()) - kwargs = dict(kwargs or {}) - tags = frozenset(tags or ()) - if isinstance(misfire_grace_time, (int, float)): - misfire_grace_time = timedelta(seconds=misfire_grace_time) - - if callable(func_or_task_id): - task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) - self.data_store.add_task(task) - else: - task = self.data_store.get_task(func_or_task_id) - - schedule = Schedule( - id=id, - task_id=task.id, - trigger=trigger, - args=args, - kwargs=kwargs, - coalesce=coalesce, - misfire_grace_time=misfire_grace_time, - max_jitter=max_jitter, - tags=tags, - ) - schedule.next_fire_time = trigger.next() - self.data_store.add_schedule(schedule, conflict_policy) - self.logger.info( - "Added new schedule (task=%r, trigger=%r); next run time at %s", - task, - trigger, - schedule.next_fire_time, + return self._portal.call( + partial( + self._async_scheduler.add_schedule, + func_or_task_id, + trigger, + id=id, + args=args, + kwargs=kwargs, + job_executor=job_executor, + coalesce=coalesce, + misfire_grace_time=misfire_grace_time, + max_jitter=max_jitter, + tags=tags, + conflict_policy=conflict_policy, + ) ) - return schedule.id def get_schedule(self, id: str) -> Schedule: - """ - Retrieve a schedule from the data store. - - :param id: the unique identifier of the schedule - :raises ScheduleLookupError: if the schedule could not be found - - """ self._ensure_services_ready() - schedules = self.data_store.get_schedules({id}) - if schedules: - return schedules[0] - else: - raise ScheduleLookupError(id) + return self._portal.call(self._async_scheduler.get_schedule, id) def get_schedules(self) -> list[Schedule]: - """ - Retrieve all schedules from the data store. - - :return: a list of schedules, in an unspecified order - - """ self._ensure_services_ready() - return self.data_store.get_schedules() + return self._portal.call(self._async_scheduler.get_schedules) def remove_schedule(self, id: str) -> None: - """ - Remove the given schedule from the data store. - - :param id: the unique identifier of the schedule - - """ self._ensure_services_ready() - self.data_store.remove_schedules({id}) + self._portal.call(self._async_scheduler.remove_schedule, id) def add_job( self, @@ -246,68 +202,28 @@ class Scheduler: *, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, tags: Iterable[str] | None = None, result_expiration_time: timedelta | float = 0, ) -> UUID: - """ - Add a job to the data store. - - :param func_or_task_id: either a callable or an ID of an existing task - definition - :param args: positional arguments to be passed to the task function - :param kwargs: keyword arguments to be passed to the task function - :param tags: strings that can be used to categorize and filter the job - :param result_expiration_time: the minimum time (as seconds, or timedelta) to - keep the result of the job available for fetching (the result won't be - saved at all if that time is 0) - :return: the ID of the newly created job - - """ self._ensure_services_ready() - if callable(func_or_task_id): - task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) - self.data_store.add_task(task) - else: - task = self.data_store.get_task(func_or_task_id) - - job = Job( - task_id=task.id, - args=args or (), - kwargs=kwargs or {}, - tags=tags or frozenset(), - result_expiration_time=result_expiration_time, + return self._portal.call( + partial( + self._async_scheduler.add_job, + func_or_task_id, + args=args, + kwargs=kwargs, + job_executor=job_executor, + tags=tags, + result_expiration_time=result_expiration_time, + ) ) - self.data_store.add_job(job) - return job.id def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult: - """ - Retrieve the result of a job. - - :param job_id: the ID of the job - :param wait: if ``True``, wait until the job has ended (one way or another), - ``False`` to raise an exception if the result is not yet available - :raises JobLookupError: if ``wait=False`` and the job result does not exist in - the data store - - """ self._ensure_services_ready() - wait_event = threading.Event() - - def listener(event: JobReleased) -> None: - if event.job_id == job_id: - wait_event.set() - - with self.data_store.events.subscribe(listener, {JobReleased}, one_shot=True): - result = self.data_store.get_job_result(job_id) - if result: - return result - elif not wait: - raise JobLookupError(job_id) - - wait_event.wait() - - return self.data_store.get_job_result(job_id) + return self._portal.call( + partial(self._async_scheduler.get_job_result, job_id, wait=wait) + ) def run_job( self, @@ -315,50 +231,20 @@ class Scheduler: *, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + job_executor: str | None = None, tags: Iterable[str] | None = (), ) -> Any: - """ - Convenience method to add a job and then return its result. - - If the job raised an exception, that exception will be reraised here. - - :param func_or_task_id: either a callable or an ID of an existing task - definition - :param args: positional arguments to be passed to the task function - :param kwargs: keyword arguments to be passed to the task function - :param tags: strings that can be used to categorize and filter the job - :returns: the return value of the task function - - """ self._ensure_services_ready() - job_complete_event = threading.Event() - - def listener(event: JobReleased) -> None: - if event.job_id == job_id: - job_complete_event.set() - - job_id: UUID | None = None - with self.data_store.events.subscribe(listener, {JobReleased}): - job_id = self.add_job( + return self._portal.call( + partial( + self._async_scheduler.run_job, func_or_task_id, args=args, kwargs=kwargs, + job_executor=job_executor, tags=tags, - result_expiration_time=timedelta(minutes=15), ) - job_complete_event.wait() - - result = self.get_job_result(job_id) - if result.outcome is JobOutcome.success: - return result.return_value - elif result.outcome is JobOutcome.error: - raise result.exception - elif result.outcome is JobOutcome.missed_start_deadline: - raise JobDeadlineMissed - elif result.outcome is JobOutcome.cancelled: - raise JobCancelled - else: - raise RuntimeError(f"Unknown job outcome: {result.outcome}") + ) def start_in_background(self) -> None: """ @@ -370,241 +256,19 @@ class Scheduler: :raises RuntimeError: if the scheduler is not in the ``stopped`` state """ - with self._lock: - if self._state is not RunState.stopped: - raise RuntimeError( - f'Cannot start the scheduler when it is in the "{self._state}" ' - f"state" - ) - - self._state = RunState.starting - - start_future: Future[None] = Future() - self._thread = threading.Thread( - target=self._run, args=[start_future], daemon=True - ) - self._thread.start() - try: - start_future.result() - except BaseException: - self._thread = None - raise - - self._exit_stack.callback(self._join_thread) - self._exit_stack.callback(self.stop) + self._ensure_services_ready() + self._portal.call(self._async_scheduler.start_in_background) def stop(self) -> None: - """ - Signal the scheduler that it should stop processing schedules. - - This method does not wait for the scheduler to actually stop. - For that, see :meth:`wait_until_stopped`. - - """ - with self._lock: - if self._state is RunState.started: - self._state = RunState.stopping - self._wakeup_event.set() + if self._portal is not None: + self._portal.call(self._async_scheduler.stop) def wait_until_stopped(self) -> None: - """ - Wait until the scheduler is in the "stopped" or "stopping" state. - - If the scheduler is already stopped or in the process of stopping, this method - returns immediately. Otherwise, it waits until the scheduler posts the - ``SchedulerStopped`` event. - - """ - with self._lock: - if self._state in (RunState.stopped, RunState.stopping): - return - - event = threading.Event() - sub = self.event_broker.subscribe( - lambda ev: event.set(), {SchedulerStopped}, one_shot=True - ) - - with sub: - event.wait() + if self._portal is not None: + self._portal.call(self._async_scheduler.wait_until_stopped) def run_until_stopped(self) -> None: - """ - Run the scheduler (and its internal worker) until it is explicitly stopped. - - This method will only return if :meth:`stop` is called. - - """ - with self._lock: - if self._state is not RunState.stopped: - raise RuntimeError( - f'Cannot start the scheduler when it is in the "{self._state}" ' - f"state" - ) - - self._state = RunState.starting - - self._run(None) - - def _run(self, start_future: Future[None] | None) -> None: - assert self._state is RunState.starting - with self._exit_stack.pop_all() as exit_stack: - try: - self._ensure_services_ready(exit_stack) - - # Wake up the scheduler if the data store emits a significant schedule - # event - exit_stack.enter_context( - self.data_store.events.subscribe( - self._schedule_added_or_modified, - {ScheduleAdded, ScheduleUpdated}, - ) - ) - - # Start the built-in worker, if configured to do so - if self.start_worker: - token = current_scheduler.set(self) - exit_stack.callback(current_scheduler.reset, token) - worker = Worker( - self.data_store, self.event_broker, is_internal=True - ) - exit_stack.enter_context(worker) - - # Signal that the scheduler has started - self._state = RunState.started - self.event_broker.publish_local(SchedulerStarted()) - except BaseException as exc: - if start_future: - start_future.set_exception(exc) - return - else: - raise - else: - if start_future: - start_future.set_result(None) - - exception: BaseException | None = None - try: - while self._state is RunState.started: - schedules = self.data_store.acquire_schedules(self.identity, 100) - self.logger.debug( - "Processing %d schedules retrieved from the data store", - len(schedules), - ) - now = datetime.now(timezone.utc) - for schedule in schedules: - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: - try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - "Error computing next fire time for schedule %r of " - "task %r – removing schedule", - schedule.id, - schedule.task_id, - ) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - max_jitter = ( - schedule.max_jitter.total_seconds() - if schedule.max_jitter - else 0 - ) - for i, fire_time in enumerate(fire_times): - # Calculate a jitter if max_jitter > 0 - jitter = _zero_timedelta - if max_jitter: - if i + 1 < len(fire_times): - next_fire_time = fire_times[i + 1] - else: - next_fire_time = schedule.next_fire_time - - if next_fire_time is not None: - # Jitter must never be so high that it would cause - # a fire time to equal or exceed the next fire time - jitter_s = min( - [ - max_jitter, - ( - next_fire_time - - fire_time - - _microsecond_delta - ).total_seconds(), - ] - ) - jitter = timedelta( - seconds=random.uniform(0, jitter_s) - ) - fire_time += jitter - - schedule.last_fire_time = fire_time - job = Job( - task_id=schedule.task_id, - args=schedule.args, - kwargs=schedule.kwargs, - schedule_id=schedule.id, - scheduled_fire_time=fire_time, - jitter=jitter, - start_deadline=schedule.next_deadline, - tags=schedule.tags, - ) - self.data_store.add_job(job) - - # Update the schedules (and release the scheduler's claim on them) - self.data_store.release_schedules(self.identity, schedules) - - # If we received fewer schedules than the maximum amount, sleep - # until the next schedule is due or the scheduler is explicitly - # woken up - wait_time = None - if len(schedules) < 100: - self._wakeup_deadline = ( - self.data_store.get_next_schedule_run_time() - ) - if self._wakeup_deadline: - wait_time = ( - self._wakeup_deadline - datetime.now(timezone.utc) - ).total_seconds() - self.logger.debug( - "Sleeping %.3f seconds until the next fire time (%s)", - wait_time, - self._wakeup_deadline, - ) - else: - self.logger.debug("Waiting for any due schedules to appear") - - if self._wakeup_event.wait(wait_time): - self._wakeup_event = threading.Event() - else: - self.logger.debug( - "Processing more schedules on the next iteration" - ) - except BaseException as exc: - exception = exc - raise - finally: - self._state = RunState.stopped - if isinstance(exception, Exception): - self.logger.exception("Scheduler crashed") - elif exception: - self.logger.info( - f"Scheduler stopped due to {exception.__class__.__name__}" - ) - else: - self.logger.info("Scheduler stopped") - - self.event_broker.publish_local(SchedulerStopped(exception=exception)) + with ExitStack() as exit_stack: + # Run the async scheduler + self._ensure_services_ready(exit_stack) + self._portal.call(self._async_scheduler.run_until_stopped) |