summaryrefslogtreecommitdiff
path: root/src/apscheduler/abc.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 15:58:06 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 16:08:49 +0300
commita0b7da22086b1704f52a3a34a3edb25f2bcd5d7e (patch)
treed7b25e7b1f29f968dd0968d2cb0ff511b3c8b56f /src/apscheduler/abc.py
parentc0d757cb8357c71a1187078bb18b246195cb8911 (diff)
downloadapscheduler-a0b7da22086b1704f52a3a34a3edb25f2bcd5d7e.tar.gz
Switched to the src/ layout
Diffstat (limited to 'src/apscheduler/abc.py')
-rw-r--r--src/apscheduler/abc.py278
1 files changed, 278 insertions, 0 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py
new file mode 100644
index 0000000..2b88a86
--- /dev/null
+++ b/src/apscheduler/abc.py
@@ -0,0 +1,278 @@
+from __future__ import annotations
+
+from abc import ABCMeta, abstractmethod
+from base64 import b64decode, b64encode
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, List, Optional, Set, Type
+from uuid import UUID
+
+from .policies import ConflictPolicy
+from .structures import Job, Schedule
+
+if TYPE_CHECKING:
+ from . import events
+
+
+class Trigger(Iterator[datetime], metaclass=ABCMeta):
+ """Abstract base class that defines the interface that every trigger must implement."""
+
+ __slots__ = ()
+
+ @abstractmethod
+ def next(self) -> Optional[datetime]:
+ """
+ Return the next datetime to fire on.
+
+ If no such datetime can be calculated, ``None`` is returned.
+ :raises apscheduler.exceptions.MaxIterationsReached:
+ """
+
+ @abstractmethod
+ def __getstate__(self):
+ """Return the (JSON compatible) serializable state of the trigger."""
+
+ @abstractmethod
+ def __setstate__(self, state):
+ """Initialize an empty instance from an existing state."""
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> datetime:
+ dateval = self.next()
+ if dateval is None:
+ raise StopIteration
+ else:
+ return dateval
+
+
+class Serializer(metaclass=ABCMeta):
+ __slots__ = ()
+
+ @abstractmethod
+ def serialize(self, obj) -> bytes:
+ pass
+
+ def serialize_to_unicode(self, obj) -> str:
+ return b64encode(self.serialize(obj)).decode('ascii')
+
+ @abstractmethod
+ def deserialize(self, serialized: bytes):
+ pass
+
+ def deserialize_from_unicode(self, serialized: str):
+ return self.deserialize(b64decode(serialized))
+
+
+class EventSource(metaclass=ABCMeta):
+ @abstractmethod
+ def subscribe(
+ self, callback: Callable[[events.Event], Any],
+ event_types: Optional[Iterable[Type[events.Event]]] = None
+ ) -> events.SubscriptionToken:
+ """
+ Subscribe to events from this event source.
+
+ :param callback: callable to be called with the event object when an event is published
+ :param event_types: an iterable of concrete Event classes to subscribe to
+ """
+
+ @abstractmethod
+ def unsubscribe(self, token: events.SubscriptionToken) -> None:
+ """
+ Cancel an event subscription.
+
+ :param token: a token returned from :meth:`subscribe`
+ """
+
+
+class DataStore(EventSource):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ @abstractmethod
+ def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]:
+ """
+ Get schedules from the data store.
+
+ :param ids: a specific set of schedule IDs to return, or ``None`` to return all schedules
+ :return: the list of matching schedules, in unspecified order
+ """
+
+ @abstractmethod
+ def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ """
+ Add or update the given schedule in the data store.
+
+ :param schedule: schedule to be added
+ :param conflict_policy: policy that determines what to do if there is an existing schedule
+ with the same ID
+ """
+
+ @abstractmethod
+ def remove_schedules(self, ids: Iterable[str]) -> None:
+ """
+ Remove schedules from the data store.
+
+ :param ids: a specific set of schedule IDs to remove
+ """
+
+ @abstractmethod
+ def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]:
+ """
+ Acquire unclaimed due schedules for processing.
+
+ This method claims up to the requested number of schedules for the given scheduler and
+ returns them.
+
+ :param scheduler_id: unique identifier of the scheduler
+ :param limit: maximum number of schedules to claim
+ :return: the list of claimed schedules
+ """
+
+ @abstractmethod
+ def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
+ """
+ Release the claims on the given schedules and update them on the store.
+
+ :param scheduler_id: unique identifier of the scheduler
+ :param schedules: the previously claimed schedules
+ """
+
+ @abstractmethod
+ def add_job(self, job: Job) -> None:
+ """
+ Add a job to be executed by an eligible worker.
+
+ :param job: the job object
+ """
+
+ @abstractmethod
+ def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]:
+ """
+ Get the list of pending jobs.
+
+ :param ids: a specific set of job IDs to return, or ``None`` to return all jobs
+ :return: the list of matching pending jobs, in the order they will be given to workers
+ """
+
+ @abstractmethod
+ def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
+ """
+ Acquire unclaimed jobs for execution.
+
+ This method claims up to the requested number of jobs for the given worker and returns
+ them.
+
+ :param worker_id: unique identifier of the worker
+ :param limit: maximum number of jobs to claim and return
+ :return: the list of claimed jobs
+ """
+
+ @abstractmethod
+ def release_jobs(self, worker_id: str, jobs: List[Job]) -> None:
+ """
+ Releases the claim on the given jobs
+
+ :param worker_id: unique identifier of the worker
+ :param jobs: the previously claimed jobs
+ """
+
+
+class AsyncDataStore(EventSource):
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ @abstractmethod
+ async def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]:
+ """
+ Get schedules from the data store.
+
+ :param ids: a specific set of schedule IDs to return, or ``None`` to return all schedules
+ :return: the list of matching schedules, in unspecified order
+ """
+
+ @abstractmethod
+ async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ """
+ Add or update the given schedule in the data store.
+
+ :param schedule: schedule to be added
+ :param conflict_policy: policy that determines what to do if there is an existing schedule
+ with the same ID
+ """
+
+ @abstractmethod
+ async def remove_schedules(self, ids: Iterable[str]) -> None:
+ """
+ Remove schedules from the data store.
+
+ :param ids: a specific set of schedule IDs to remove
+ """
+
+ @abstractmethod
+ async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]:
+ """
+ Acquire unclaimed due schedules for processing.
+
+ This method claims up to the requested number of schedules for the given scheduler and
+ returns them.
+
+ :param scheduler_id: unique identifier of the scheduler
+ :param limit: maximum number of schedules to claim
+ :return: the list of claimed schedules
+ """
+
+ @abstractmethod
+ async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
+ """
+ Release the claims on the given schedules and update them on the store.
+
+ :param scheduler_id: unique identifier of the scheduler
+ :param schedules: the previously claimed schedules
+ """
+
+ @abstractmethod
+ async def add_job(self, job: Job) -> None:
+ """
+ Add a job to be executed by an eligible worker.
+
+ :param job: the job object
+ """
+
+ @abstractmethod
+ async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]:
+ """
+ Get the list of pending jobs.
+
+ :param ids: a specific set of job IDs to return, or ``None`` to return all jobs
+ :return: the list of matching pending jobs, in the order they will be given to workers
+ """
+
+ @abstractmethod
+ async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
+ """
+ Acquire unclaimed jobs for execution.
+
+ This method claims up to the requested number of jobs for the given worker and returns
+ them.
+
+ :param worker_id: unique identifier of the worker
+ :param limit: maximum number of jobs to claim and return
+ :return: the list of claimed jobs
+ """
+
+ @abstractmethod
+ async def release_jobs(self, worker_id: str, jobs: List[Job]) -> None:
+ """
+ Releases the claim on the given jobs
+
+ :param worker_id: unique identifier of the worker
+ :param jobs: the previously claimed jobs
+ """