diff options
Diffstat (limited to 'docs/extending.rst')
-rw-r--r-- | docs/extending.rst | 174 |
1 files changed, 71 insertions, 103 deletions
diff --git a/docs/extending.rst b/docs/extending.rst index 9efe8b7..adde8eb 100644 --- a/docs/extending.rst +++ b/docs/extending.rst @@ -2,141 +2,109 @@ Extending APScheduler ##################### -This document is meant to explain how to develop your custom triggers, job stores, executors and -schedulers. +This document is meant to explain how to develop your custom triggers and data stores. Custom triggers --------------- -The built-in triggers cover the needs of the majority of all users. -However, some users may need specialized scheduling logic. To that end, the trigger system was made -pluggable. +.. py:currentmodule:: apscheduler.triggers -To implement your scheduling logic, subclass :class:`~apscheduler.triggers.base.BaseTrigger`. -Look at the interface documentation in that class. Then look at the existing trigger -implementations. That should give you a good idea what is expected of a trigger implementation. +The built-in triggers cover the needs of the majority of all users, particularly so when +combined using :class:`~.combining.AndTrigger` and :class:`~.combining.OrTrigger`. +However, some users may need specialized scheduling logic. This can be accomplished by +creating your own custom trigger class. -To use your trigger, you can use :meth:`~apscheduler.schedulers.base.BaseScheduler.add_job` like -this:: +To implement your scheduling logic, create a new class that inherits from the +:class:`~..abc.Trigger` interface class:: - trigger = MyTrigger(arg1='foo') - scheduler.add_job(target, trigger) + from __future__ import annotations -You can also register it as a plugin so you can use the alternate form of -``add_job``:: + from apscheduler.abc import Trigger - scheduler.add_job(target, 'my_trigger', arg1='foo') + class MyCustomTrigger(Trigger): + def next() -> datetime | None: + ... # Your custom logic here -This is done by adding an entry point in your project's :file:`setup.py`:: + def __getstate__(): + ... # Return the serializable state here - ... - entry_points={ - 'apscheduler.triggers': ['my_trigger = mytoppackage.subpackage:MyTrigger'] - } + def __setstate__(state): + ... # Restore the state from the return value of __getstate__() +Requirements and constraints for trigger classes: -Custom job stores ------------------ +* :meth:`~..abc.Trigger.next` must always either return a timezone aware + :class:`datetime` object or :data:`None` if a new run time cannot be calculated +* :meth:`~..abc.Trigger.next` must never return the same :class:`~datetime.datetime` + twice and never one that is earlier than the previously returned one +* :meth:`~..abc.Trigger.__setstate__` must accept the return value of + :meth:`~..abc.Trigger.__getstate__` and restore the trigger to the functionally same + state as the original -If you want to store your jobs in a fancy new NoSQL database, or a totally custom datastore, you -can implement your own job store by subclassing :class:`~apscheduler.jobstores.base.BaseJobStore`. +Triggers are stateful objects. The :meth:`~..abc.Trigger.next` method is where you +determine the next run time based on the current state of the trigger. The trigger's +internal state needs to be updated before returning to ensure that the trigger won't +return the same datetime on the next call. The trigger code does **not** need to be +thread-safe. -A job store typically serializes the :class:`~apscheduler.job.Job` objects given to it, and -constructs new Job objects from binary data when they are loaded from the backing store. It is -important that the job store restores the ``_scheduler`` and ``_jobstore_alias`` attribute of any -Job that it creates. Refer to existing implementations for examples. -It should be noted that :class:`~apscheduler.jobstores.memory.MemoryJobStore` is special in that it -does not deserialize the jobs. This comes with its own problems, which it handles in its own way. -If your job store does serialize jobs, you can of course use a serializer other than pickle. -You should, however, use the ``__getstate__`` and ``__setstate__`` special methods to respectively -get and set the Job state. Pickle uses them implicitly. +Custom data stores +------------------ -To use your job store, you can add it to the scheduler like this:: +If you want to make use of some external service to store the scheduler data, and it's +not covered by a built-in data store implementation, you may want to create a custom +data store class. It should be noted that custom data stores are substantially harder to +implement than custom triggers. - jobstore = MyJobStore() - scheduler.add_jobstore(jobstore, 'mystore') +Data store classes have the following design requirements: -You can also register it as a plugin so you can use can use the alternate form of -``add_jobstore``:: +* Must publish the appropriate events to an event broker +* Code must be thread safe (synchronous API) or task safe (asynchronous API) - scheduler.add_jobstore('my_jobstore', 'mystore') +The data store class needs to inherit from either :class:`~..abc.DataStore` or +:class:`~..abc.AsyncDataStore`, depending on whether you want to implement the store +using synchronous or asynchronous APIs: -This is done by adding an entry point in your project's :file:`setup.py`:: +.. tabs:: - ... - entry_points={ - 'apscheduler.jobstores': ['my_jobstore = mytoppackage.subpackage:MyJobStore'] - } + .. tab:: python Synchronous + from apscheduler.abc import DataStore, EventBroker -Custom executors ----------------- + class MyCustomDataStore(Datastore): + def start(self, event_broker: EventBroker) -> None: + ... # Save the event broker in a member attribute and initialize the store -If you need custom logic for executing your jobs, you can create your own executor classes. -One scenario for this would be if you want to use distributed computing to run your jobs on other -nodes. + def stop(self, *, force: bool = False) -> None: + ... # Shut down the store -Start by subclassing :class:`~apscheduler.executors.base.BaseExecutor`. -The responsibilities of an executor are as follows: + # See the interface class for the rest of the abstract methods -* Performing any initialization when ``start()`` is called -* Releasing any resources when ``shutdown()`` is called -* Keeping track of the number of instances of each job running on it, and refusing to run more - than the maximum -* Notifying the scheduler of the results of the job + .. tab:: python Asynchronous -If your executor needs to serialize the jobs, make sure you either use pickle for it, or invoke the -``__getstate__`` and ``__setstate__`` special methods to respectively get and set the Job state. -Pickle uses them implicitly. + from apscheduler.abc import AsyncDataStore, AsyncEventBroker -To use your executor, you can add it to the scheduler like this:: + class MyCustomDataStore(AsyncDatastore): + async def start(self, event_broker: AsyncEventBroker) -> None: + ... # Save the event broker in a member attribute and initialize the store - executor = MyExecutor() - scheduler.add_executor(executor, 'myexecutor') + async def stop(self, *, force: bool = False) -> None: + ... # Shut down the store -You can also register it as a plugin so you can use can use the alternate form of -``add_executor``:: + # See the interface class for the rest of the abstract methods - scheduler.add_executor('my_executor', 'myexecutor') +Handling temporary failures ++++++++++++++++++++++++++++ -This is done by adding an entry point in your project's :file:`setup.py`:: +If you plan to make the data store implementation public, it is strongly recommended +that you make an effort to ensure that the implementation can tolerate the loss of +connectivity to the backing store. The Tenacity_ library is used for this purpose by the +built-in stores to retry operations in case of a disconnection. If you use it to retry +operations when exceptions are raised, it is important to only do that in cases of +*temporary* errors, like connectivity loss, and not in cases like authentication +failure, missing database and so forth. See the built-in data store implementations and +Tenacity_ documentation for more information on how to pick the exceptions on which to +retry the operations. - ... - entry_points={ - 'apscheduler.executors': ['my_executor = mytoppackage.subpackage:MyExecutor'] - } - - -Custom schedulers ------------------ - -A typical situation where you would want to make your own scheduler subclass is when you want to -integrate it with your -application framework of choice. - -Your custom scheduler should always be a subclass of -:class:`~apscheduler.schedulers.base.BaseScheduler`. But if you're not adapting to a framework that -relies on callbacks, consider subclassing -:class:`~apscheduler.schedulers.blocking.BlockingScheduler` instead. - -The most typical extension points for scheduler subclasses are: - * :meth:`~apscheduler.schedulers.base.BaseScheduler.start` - must be overridden to wake up the scheduler for the first time - * :meth:`~apscheduler.schedulers.base.BaseScheduler.shutdown` - must be overridden to release resources allocated during ``start()`` - * :meth:`~apscheduler.schedulers.base.BaseScheduler.wakeup` - must be overridden to manage the timernotify the scheduler of changes in the job store - * :meth:`~apscheduler.schedulers.base.BaseScheduler._create_lock` - override if your framework uses some alternate locking implementation (like gevent) - * :meth:`~apscheduler.schedulers.base.BaseScheduler._create_default_executor` - override if you need to use an alternative default executor - -.. important:: Remember to call the superclass implementations of overridden methods, even abstract - ones (unless they're empty). - -The most important responsibility of the scheduler subclass is to manage the scheduler's sleeping -based on the return values of ``_process_jobs()``. This can be done in various ways, including -setting timeouts in ``wakeup()`` or running a blocking loop in ``start()``. Again, see the existing -scheduler classes for examples. +.. _Tenacity: https://pypi.org/project/tenacity/ |