diff options
Diffstat (limited to 'apscheduler/abc.py')
-rw-r--r-- | apscheduler/abc.py | 159 |
1 files changed, 74 insertions, 85 deletions
diff --git a/apscheduler/abc.py b/apscheduler/abc.py index 4bab22b..0acf7fd 100644 --- a/apscheduler/abc.py +++ b/apscheduler/abc.py @@ -3,11 +3,10 @@ from base64 import b64decode, b64encode from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import ( - Any, AsyncContextManager, Callable, Dict, FrozenSet, Iterable, Iterator, List, Mapping, - Optional, Set) -from uuid import uuid4 + Any, Callable, Dict, FrozenSet, Iterable, Iterator, List, Optional, Set, Type) +from uuid import UUID, uuid4 -from apscheduler.events import Event +from .policies import CoalescePolicy, ConflictPolicy class Trigger(Iterator[datetime], metaclass=ABCMeta): @@ -60,7 +59,7 @@ class Schedule: trigger: Trigger = field(compare=False) args: tuple = field(compare=False) kwargs: Dict[str, Any] = field(compare=False) - coalesce: bool = field(compare=False) + coalesce: CoalescePolicy = field(compare=False) misfire_grace_time: Optional[timedelta] = field(compare=False) tags: FrozenSet[str] = field(compare=False) next_fire_time: Optional[datetime] = field(compare=False, default=None) @@ -76,13 +75,13 @@ class Schedule: @dataclass(unsafe_hash=True) class Job: - id: str = field(init=False, default_factory=lambda: str(uuid4())) + id: UUID = field(init=False, default_factory=uuid4) task_id: str = field(compare=False) func: Callable = field(compare=False) args: tuple = field(compare=False) kwargs: Dict[str, Any] = field(compare=False) schedule_id: Optional[str] = field(compare=False, default=None) - scheduled_start_time: Optional[datetime] = field(compare=False, default=None) + scheduled_fire_time: Optional[datetime] = field(compare=False, default=None) start_deadline: Optional[datetime] = field(compare=False, default=None) tags: Optional[FrozenSet[str]] = field(compare=False, default_factory=frozenset) started_at: Optional[datetime] = field(init=False, compare=False, default=None) @@ -106,22 +105,44 @@ class Serializer(metaclass=ABCMeta): return self.deserialize(b64decode(serialized)) -class EventSource(metaclass=ABCMeta): - __slots__ = () +@dataclass(frozen=True) +class Event: + timestamp: datetime + +@dataclass +class EventSource(metaclass=ABCMeta): @abstractmethod - async def subscribe(self, callback: Callable[[Event], Any]) -> None: + def subscribe(self, callback: Callable[[Event], Any], + event_types: Optional[Iterable[Type[Event]]] = None) -> None: """ Subscribe to events from this event source. :param callback: callable to be called with the event object when an event is published - :return: an async iterable yielding event objects + :param event_types: an iterable of concrete Event classes to subscribe to """ + @abstractmethod + def unsubscribe(self, callback: Callable[[Event], Any], + event_types: Optional[Iterable[Type[Event]]] = None) -> None: + """ + Cancel an event subscription -class DataStore(EventSource): - __slots__ = () + :param callback: + :param event_types: an iterable of concrete Event classes to unsubscribe from + :return: + """ + @abstractmethod + async def publish(self, event: Event) -> None: + """ + Publish an event. + + :param event: the event to publish + """ + + +class DataStore(EventSource): async def __aenter__(self): return self @@ -138,112 +159,80 @@ class DataStore(EventSource): """ @abstractmethod - async def add_or_replace_schedules(self, schedules: Iterable[Schedule]) -> None: + async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: """ Add or update the given schedule in the data store. - :param schedules: schedules to be added or updated + :param schedule: schedule to be added + :param conflict_policy: policy that determines what to do if there is an existing schedule + with the same ID """ @abstractmethod - async def update_schedules(self, updates: Mapping[str, Dict[str, Any]]) -> Set[str]: - """ - Update one or more existing schedules. - - :param updates: mapping of schedule ID to attribute names to be updated - :return: the set of schedule IDs that were modified by this operation. - """ - - @abstractmethod - async def remove_schedules(self, ids: Optional[Set[str]] = None) -> None: + async def remove_schedules(self, ids: Iterable[str]) -> None: """ Remove schedules from the data store. - :param ids: a specific set of schedule IDs to remove, or ``None`` in which case all - schedules are removed + :param ids: a specific set of schedule IDs to remove """ @abstractmethod - async def get_next_fire_time(self) -> Optional[datetime]: + async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: """ - Return the earliest fire time among all unclaimed schedules. + Acquire unclaimed due schedules for processing. - If no running, unclaimed schedules exist, ``None`` is returned. + This method claims up to the requested number of schedules for the given scheduler and + returns them. + + :param scheduler_id: unique identifier of the scheduler + :param limit: maximum number of schedules to claim + :return: the list of claimed schedules """ @abstractmethod - async def acquire_due_schedules( - self, scheduler_id: str, - max_scheduled_time: datetime) -> AsyncContextManager[List[Schedule]]: + async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: """ - Acquire an undefined amount of due schedules not claimed by any other scheduler. + Release the claims on the given schedules and update them on the store. - This method claims due schedules for the given scheduler and returns them. - When the scheduler has updated the objects, it calls :meth:`release_due_schedules` to - release the claim on them. + :param scheduler_id: unique identifier of the scheduler + :param schedules: the previously claimed schedules """ - # @abstractmethod - # async def release_due_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: - # """ - # Update the given schedules and release the claim on them held by this scheduler. - # - # This method should do the following: - # - # #. Remove any of the schedules in the store that have no next fire time - # #. Update the schedules that do have a next fire time - # #. Release any locks held on the schedules by this scheduler - # - # :param scheduler_id: identifier of the scheduler - # :param schedules: schedules previously acquired using :meth:`acquire_due_schedules` - # """ - - # @abstractmethod - # async def acquire_job(self, worker_id: str, tags: Set[str]) -> Job: - # """ - # Claim and return the next matching job from the queue. - # - # :return: the acquired job - # """ - # - # @abstractmethod - # async def release_job(self, job: Job) -> None: - # """Remove the given job from the queue.""" - - -class Executor(EventSource): @abstractmethod - async def submit_job(self, job: Job) -> None: + async def add_job(self, job: Job) -> None: """ - Submit a task to be run in this executor. - - The executor may alter the ``id`` attribute of the job before returning. + Add a job to be executed by an eligible worker. :param job: the job object """ @abstractmethod - async def get_jobs(self) -> List[Job]: + async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: """ - Get jobs currently queued or running in this executor. + Get the list of pending jobs. - :return: list of jobs + :param ids: a specific set of job IDs to return, or ``None`` to return all jobs + :return: the list of matching pending jobs, in the order they will be given to workers """ + @abstractmethod + async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + """ + Acquire unclaimed jobs for execution. -class EventHub(metaclass=ABCMeta): - __slots__ = () - - async def start(self) -> None: - pass + This method claims up to the requested number of jobs for the given worker and returns + them. - async def stop(self) -> None: - pass + :param worker_id: unique identifier of the worker + :param limit: maximum number of jobs to claim and return + :return: the list of claimed jobs + """ @abstractmethod - async def publish(self, event: Event) -> None: - """Publish an event.""" + async def release_jobs(self, worker_id: str, jobs: List[Job]) -> None: + """ + Releases the claim on the given jobs - @abstractmethod - async def subscribe(self, callback: Callable[[Event], Any]) -> None: - """Add a callback to be called when a new event is published.""" + :param worker_id: unique identifier of the worker + :param jobs: the previously claimed jobs + """ |