summaryrefslogtreecommitdiff
path: root/apscheduler/abc.py
diff options
context:
space:
mode:
Diffstat (limited to 'apscheduler/abc.py')
-rw-r--r--apscheduler/abc.py159
1 files changed, 74 insertions, 85 deletions
diff --git a/apscheduler/abc.py b/apscheduler/abc.py
index 4bab22b..0acf7fd 100644
--- a/apscheduler/abc.py
+++ b/apscheduler/abc.py
@@ -3,11 +3,10 @@ from base64 import b64decode, b64encode
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import (
- Any, AsyncContextManager, Callable, Dict, FrozenSet, Iterable, Iterator, List, Mapping,
- Optional, Set)
-from uuid import uuid4
+ Any, Callable, Dict, FrozenSet, Iterable, Iterator, List, Optional, Set, Type)
+from uuid import UUID, uuid4
-from apscheduler.events import Event
+from .policies import CoalescePolicy, ConflictPolicy
class Trigger(Iterator[datetime], metaclass=ABCMeta):
@@ -60,7 +59,7 @@ class Schedule:
trigger: Trigger = field(compare=False)
args: tuple = field(compare=False)
kwargs: Dict[str, Any] = field(compare=False)
- coalesce: bool = field(compare=False)
+ coalesce: CoalescePolicy = field(compare=False)
misfire_grace_time: Optional[timedelta] = field(compare=False)
tags: FrozenSet[str] = field(compare=False)
next_fire_time: Optional[datetime] = field(compare=False, default=None)
@@ -76,13 +75,13 @@ class Schedule:
@dataclass(unsafe_hash=True)
class Job:
- id: str = field(init=False, default_factory=lambda: str(uuid4()))
+ id: UUID = field(init=False, default_factory=uuid4)
task_id: str = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(compare=False)
kwargs: Dict[str, Any] = field(compare=False)
schedule_id: Optional[str] = field(compare=False, default=None)
- scheduled_start_time: Optional[datetime] = field(compare=False, default=None)
+ scheduled_fire_time: Optional[datetime] = field(compare=False, default=None)
start_deadline: Optional[datetime] = field(compare=False, default=None)
tags: Optional[FrozenSet[str]] = field(compare=False, default_factory=frozenset)
started_at: Optional[datetime] = field(init=False, compare=False, default=None)
@@ -106,22 +105,44 @@ class Serializer(metaclass=ABCMeta):
return self.deserialize(b64decode(serialized))
-class EventSource(metaclass=ABCMeta):
- __slots__ = ()
+@dataclass(frozen=True)
+class Event:
+ timestamp: datetime
+
+@dataclass
+class EventSource(metaclass=ABCMeta):
@abstractmethod
- async def subscribe(self, callback: Callable[[Event], Any]) -> None:
+ def subscribe(self, callback: Callable[[Event], Any],
+ event_types: Optional[Iterable[Type[Event]]] = None) -> None:
"""
Subscribe to events from this event source.
:param callback: callable to be called with the event object when an event is published
- :return: an async iterable yielding event objects
+ :param event_types: an iterable of concrete Event classes to subscribe to
"""
+ @abstractmethod
+ def unsubscribe(self, callback: Callable[[Event], Any],
+ event_types: Optional[Iterable[Type[Event]]] = None) -> None:
+ """
+ Cancel an event subscription
-class DataStore(EventSource):
- __slots__ = ()
+ :param callback:
+ :param event_types: an iterable of concrete Event classes to unsubscribe from
+ :return:
+ """
+ @abstractmethod
+ async def publish(self, event: Event) -> None:
+ """
+ Publish an event.
+
+ :param event: the event to publish
+ """
+
+
+class DataStore(EventSource):
async def __aenter__(self):
return self
@@ -138,112 +159,80 @@ class DataStore(EventSource):
"""
@abstractmethod
- async def add_or_replace_schedules(self, schedules: Iterable[Schedule]) -> None:
+ async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
"""
Add or update the given schedule in the data store.
- :param schedules: schedules to be added or updated
+ :param schedule: schedule to be added
+ :param conflict_policy: policy that determines what to do if there is an existing schedule
+ with the same ID
"""
@abstractmethod
- async def update_schedules(self, updates: Mapping[str, Dict[str, Any]]) -> Set[str]:
- """
- Update one or more existing schedules.
-
- :param updates: mapping of schedule ID to attribute names to be updated
- :return: the set of schedule IDs that were modified by this operation.
- """
-
- @abstractmethod
- async def remove_schedules(self, ids: Optional[Set[str]] = None) -> None:
+ async def remove_schedules(self, ids: Iterable[str]) -> None:
"""
Remove schedules from the data store.
- :param ids: a specific set of schedule IDs to remove, or ``None`` in which case all
- schedules are removed
+ :param ids: a specific set of schedule IDs to remove
"""
@abstractmethod
- async def get_next_fire_time(self) -> Optional[datetime]:
+ async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]:
"""
- Return the earliest fire time among all unclaimed schedules.
+ Acquire unclaimed due schedules for processing.
- If no running, unclaimed schedules exist, ``None`` is returned.
+ This method claims up to the requested number of schedules for the given scheduler and
+ returns them.
+
+ :param scheduler_id: unique identifier of the scheduler
+ :param limit: maximum number of schedules to claim
+ :return: the list of claimed schedules
"""
@abstractmethod
- async def acquire_due_schedules(
- self, scheduler_id: str,
- max_scheduled_time: datetime) -> AsyncContextManager[List[Schedule]]:
+ async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
"""
- Acquire an undefined amount of due schedules not claimed by any other scheduler.
+ Release the claims on the given schedules and update them on the store.
- This method claims due schedules for the given scheduler and returns them.
- When the scheduler has updated the objects, it calls :meth:`release_due_schedules` to
- release the claim on them.
+ :param scheduler_id: unique identifier of the scheduler
+ :param schedules: the previously claimed schedules
"""
- # @abstractmethod
- # async def release_due_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
- # """
- # Update the given schedules and release the claim on them held by this scheduler.
- #
- # This method should do the following:
- #
- # #. Remove any of the schedules in the store that have no next fire time
- # #. Update the schedules that do have a next fire time
- # #. Release any locks held on the schedules by this scheduler
- #
- # :param scheduler_id: identifier of the scheduler
- # :param schedules: schedules previously acquired using :meth:`acquire_due_schedules`
- # """
-
- # @abstractmethod
- # async def acquire_job(self, worker_id: str, tags: Set[str]) -> Job:
- # """
- # Claim and return the next matching job from the queue.
- #
- # :return: the acquired job
- # """
- #
- # @abstractmethod
- # async def release_job(self, job: Job) -> None:
- # """Remove the given job from the queue."""
-
-
-class Executor(EventSource):
@abstractmethod
- async def submit_job(self, job: Job) -> None:
+ async def add_job(self, job: Job) -> None:
"""
- Submit a task to be run in this executor.
-
- The executor may alter the ``id`` attribute of the job before returning.
+ Add a job to be executed by an eligible worker.
:param job: the job object
"""
@abstractmethod
- async def get_jobs(self) -> List[Job]:
+ async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]:
"""
- Get jobs currently queued or running in this executor.
+ Get the list of pending jobs.
- :return: list of jobs
+ :param ids: a specific set of job IDs to return, or ``None`` to return all jobs
+ :return: the list of matching pending jobs, in the order they will be given to workers
"""
+ @abstractmethod
+ async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
+ """
+ Acquire unclaimed jobs for execution.
-class EventHub(metaclass=ABCMeta):
- __slots__ = ()
-
- async def start(self) -> None:
- pass
+ This method claims up to the requested number of jobs for the given worker and returns
+ them.
- async def stop(self) -> None:
- pass
+ :param worker_id: unique identifier of the worker
+ :param limit: maximum number of jobs to claim and return
+ :return: the list of claimed jobs
+ """
@abstractmethod
- async def publish(self, event: Event) -> None:
- """Publish an event."""
+ async def release_jobs(self, worker_id: str, jobs: List[Job]) -> None:
+ """
+ Releases the claim on the given jobs
- @abstractmethod
- async def subscribe(self, callback: Callable[[Event], Any]) -> None:
- """Add a callback to be called when a new event is published."""
+ :param worker_id: unique identifier of the worker
+ :param jobs: the previously claimed jobs
+ """