summaryrefslogtreecommitdiff
path: root/apscheduler/abc.py
diff options
context:
space:
mode:
Diffstat (limited to 'apscheduler/abc.py')
-rw-r--r--apscheduler/abc.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/apscheduler/abc.py b/apscheduler/abc.py
new file mode 100644
index 0000000..0b7e89a
--- /dev/null
+++ b/apscheduler/abc.py
@@ -0,0 +1,256 @@
+from abc import ABCMeta, abstractmethod
+from base64 import b64encode, b64decode
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta
+from typing import (
+ Callable, Iterable, Iterator, Mapping, Any, NoReturn, Optional, Union, AsyncIterable, Dict,
+ FrozenSet, List, Set)
+
+from .events import Event
+
+
+class Trigger(Iterator[datetime], metaclass=ABCMeta):
+ """Abstract base class that defines the interface that every trigger must implement."""
+
+ __slots__ = ()
+
+ @abstractmethod
+ def next(self) -> Optional[datetime]:
+ """
+ Return the next datetime to fire on.
+
+ If no such datetime can be calculated, ``None`` is returned.
+ :raises apscheduler.exceptions.MaxIterationsReached:
+ """
+
+ @abstractmethod
+ def __getstate__(self):
+ """Return the (JSON compatible) serializable state of the trigger."""
+
+ @abstractmethod
+ def __setstate__(self, state):
+ """Initialize an empty instance from an existing state."""
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> datetime:
+ dateval = self.next()
+ if dateval is None:
+ raise StopIteration
+ else:
+ return dateval
+
+
+@dataclass
+class Task:
+ id: str
+ func: Callable
+ max_instances: Optional[int] = None
+ metadata_arg: Optional[str] = None
+ stateful: bool = False
+ misfire_grace_time: Optional[timedelta] = None
+
+
+@dataclass
+class Schedule:
+ id: str
+ task_id: str
+ trigger: Trigger
+ args: tuple = ()
+ kwargs: Dict[str, Any] = field(default_factory=dict)
+ coalesce: bool = True
+ misfire_grace_time: Optional[timedelta] = None
+ tags: Optional[FrozenSet[str]] = frozenset()
+ last_fire_time: Optional[datetime] = field(init=False, default=None)
+ next_fire_time: Optional[datetime] = field(init=False, default=None)
+
+
+@dataclass(frozen=True)
+class Job:
+ func_ref: str
+ args: Optional[tuple] = None
+ kwargs: Optional[Dict[str, Any]] = None
+ schedule_id: Optional[str] = None
+ scheduled_start_time: Optional[datetime] = None
+ start_deadline: Optional[datetime] = None
+ tags: Optional[FrozenSet[str]] = frozenset()
+
+
+class Serializer(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def serialize(self, obj) -> bytes:
+ pass
+
+ def serialize_to_unicode(self, obj) -> str:
+ return b64encode(self.serialize(obj)).decode('ascii')
+
+ @abstractmethod
+ def deserialize(self, serialized: bytes):
+ pass
+
+ def deserialize_from_unicode(self, serialized: str):
+ return self.deserialize(b64decode(serialized))
+
+
+class DataStore(metaclass=ABCMeta):
+ __slots__ = ()
+
+ async def __aenter__(self):
+ await self.start()
+ return self
+
+ async def __aexit__(self):
+ await self.stop()
+
+ async def start(self) -> None:
+ pass
+
+ async def stop(self) -> None:
+ pass
+
+ @abstractmethod
+ async def add_or_update_schedule(self, schedule: Schedule) -> None:
+ """Add or update the given schedule in the store."""
+
+ @abstractmethod
+ async def remove_schedule(self, schedule_id: str) -> None:
+ """Remove the designated schedule from the store."""
+
+ @abstractmethod
+ async def remove_all_schedules(self) -> None:
+ """Remove all schedules from the store."""
+
+ @abstractmethod
+ async def get_all_schedules(self) -> List[Schedule]:
+ """Get a list of all schedules, sorted on the "id" attribute."""
+
+ @abstractmethod
+ async def get_next_fire_time(self) -> Optional[datetime]:
+ """
+ Return the earliest fire time among all unclaimed schedules.
+
+ If no running, unclaimed schedules exist, ``None`` is returned.
+ """
+
+ @abstractmethod
+ async def acquire_due_schedules(self, scheduler_id: str) -> List[Schedule]:
+ """
+ Acquire an undefined amount of due schedules not claimed by any other scheduler.
+
+ This method claims due schedules for the given scheduler and returns them.
+ When the scheduler has updated the objects, it calls :meth:`release_due_schedules` to
+ release the claim on them.
+ """
+
+ @abstractmethod
+ async def release_due_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
+ """
+ Update the given schedules and release the claim on them held by this scheduler.
+
+ This method should do the following:
+
+ #. Remove any of the schedules in the store that have no next fire time
+ #. Update the schedules that do have a next fire time
+ #. Release any locks held on the schedules by this scheduler
+
+ :param scheduler_id: identifier of the scheduler
+ :param schedules: schedules previously acquired using :meth:`acquire_due_schedules`
+ """
+
+ @abstractmethod
+ async def acquire_job(self, worker_id: str, tags: Set[str]) -> Job:
+ """
+ Claim and return the next matching job from the queue.
+
+ :return: the acquired job
+ """
+
+ @abstractmethod
+ async def release_job(self, job: Job) -> None:
+ """Remove the given job from the queue."""
+
+
+class EventHub(metaclass=ABCMeta):
+ __slots__ = ()
+
+ async def start(self) -> None:
+ pass
+
+ async def stop(self) -> None:
+ pass
+
+ @abstractmethod
+ async def publish(self, event: Event) -> None:
+ """Publish an event."""
+
+ @abstractmethod
+ async def subscribe(self) -> AsyncIterable[Event]:
+ """Return an asynchronous iterable yielding newly received events."""
+
+
+class AsyncScheduler(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def define_task(self, func: Callable, task_id: Optional[str] = None, *,
+ max_instances: Optional[int],
+ misfire_grace_time: Union[float, timedelta]) -> str:
+ if not task_id:
+ task_id = f'{func.__module__}.{func.__qualname__}'
+ if isinstance(misfire_grace_time, float):
+ misfire_grace_time = timedelta(misfire_grace_time)
+
+ task = Task(id=task_id, func=func, max_instances=max_instances,
+ misfire_grace_time=misfire_grace_time)
+
+ return task_id
+
+ @abstractmethod
+ async def add_schedule(self, task: Union[str, Callable], trigger: Trigger, *, args: Iterable,
+ kwargs: Mapping[str, Any]) -> str:
+ """
+
+
+
+ :param task: callable or ID of a predefined task
+ :param trigger: trigger to define the run times of the schedule
+ :param args: positional arguments to pass to the task callable
+ :param kwargs: keyword arguments to pass to the task callable
+ :return: identifier of the created schedule
+ """
+
+ @abstractmethod
+ async def remove_schedule(self, schedule_id: str) -> None:
+ """Removes the designated schedule."""
+
+ @abstractmethod
+ async def run(self) -> NoReturn:
+ """
+ Runs the scheduler loop.
+
+ This method does not return.
+ """
+
+
+class SyncScheduler(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def add_schedule(self, task: Callable, trigger: Trigger, *, args: Iterable,
+ kwargs: Mapping[str, Any]) -> str:
+ pass
+
+ @abstractmethod
+ def remove_schedule(self, schedule_id: str) -> None:
+ pass
+
+ @abstractmethod
+ def run(self) -> NoReturn:
+ pass
+
+ add_schedule.__doc__ = AsyncScheduler.add_schedule.__doc__
+ remove_schedule.__doc__ = AsyncScheduler.remove_schedule.__doc__
+ run.__doc__ = AsyncScheduler.run.__doc__