summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:00:57 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:11:20 +0300
commitb20f62d929eed84ad18020bb82dd43d8cb70da4d (patch)
treec42bf1877dd54755c55c649269e1254995bdf0c9
parent82992cd427a9ab2351d8e0719b82d826dff5a521 (diff)
downloadapscheduler-b20f62d929eed84ad18020bb82dd43d8cb70da4d.tar.gz
Switched to Black for code formatting
-rw-r--r--.pre-commit-config.yaml96
-rw-r--r--docs/conf.py23
-rw-r--r--examples/executors/processpool.py10
-rw-r--r--examples/misc/reference.py6
-rw-r--r--examples/rpc/client.py6
-rw-r--r--examples/rpc/server.py12
-rw-r--r--examples/schedulers/async_.py3
-rw-r--r--examples/schedulers/sync.py6
-rw-r--r--pyproject.toml6
-rw-r--r--src/apscheduler/abc.py19
-rw-r--r--src/apscheduler/context.py12
-rw-r--r--src/apscheduler/converters.py7
-rw-r--r--src/apscheduler/datastores/async_adapter.py24
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py353
-rw-r--r--src/apscheduler/datastores/memory.py71
-rw-r--r--src/apscheduler/datastores/mongodb.py314
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py446
-rw-r--r--src/apscheduler/eventbrokers/async_adapter.py4
-rw-r--r--src/apscheduler/eventbrokers/async_local.py13
-rw-r--r--src/apscheduler/eventbrokers/asyncpg.py16
-rw-r--r--src/apscheduler/eventbrokers/base.py44
-rw-r--r--src/apscheduler/eventbrokers/local.py29
-rw-r--r--src/apscheduler/eventbrokers/mqtt.py22
-rw-r--r--src/apscheduler/eventbrokers/redis.py12
-rw-r--r--src/apscheduler/events.py8
-rw-r--r--src/apscheduler/exceptions.py18
-rw-r--r--src/apscheduler/marshalling.py52
-rw-r--r--src/apscheduler/schedulers/async_.py173
-rw-r--r--src/apscheduler/schedulers/sync.py160
-rw-r--r--src/apscheduler/serializers/cbor.py10
-rw-r--r--src/apscheduler/serializers/json.py14
-rw-r--r--src/apscheduler/structures.py149
-rw-r--r--src/apscheduler/triggers/calendarinterval.py47
-rw-r--r--src/apscheduler/triggers/combining.py38
-rw-r--r--src/apscheduler/triggers/cron/__init__.py128
-rw-r--r--src/apscheduler/triggers/cron/expressions.py101
-rw-r--r--src/apscheduler/triggers/cron/fields.py77
-rw-r--r--src/apscheduler/triggers/date.py10
-rw-r--r--src/apscheduler/triggers/interval.py68
-rw-r--r--src/apscheduler/util.py16
-rw-r--r--src/apscheduler/validators.py58
-rw-r--r--src/apscheduler/workers/async_.py54
-rw-r--r--src/apscheduler/workers/sync.py41
-rw-r--r--tests/conftest.py18
-rw-r--r--tests/test_datastores.py331
-rw-r--r--tests/test_eventbrokers.py111
-rw-r--r--tests/test_marshalling.py81
-rw-r--r--tests/test_schedulers.py170
-rw-r--r--tests/test_workers.py77
-rw-r--r--tests/triggers/test_calendarinterval.py53
-rw-r--r--tests/triggers/test_combining.py46
-rw-r--r--tests/triggers/test_cron.py407
-rw-r--r--tests/triggers/test_interval.py33
53 files changed, 2661 insertions, 1442 deletions
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 6dc6c9f..4b438e0 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,49 +1,59 @@
+# This is the configuration file for pre-commit (https://pre-commit.com/).
+# To use:
+# * Install pre-commit (https://pre-commit.com/#installation)
+# * Copy this file as ".pre-commit-config.yaml"
+# * Run "pre-commit install".
repos:
-- repo: https://github.com/pre-commit/pre-commit-hooks
- rev: v4.2.0
- hooks:
- - id: check-added-large-files
- - id: check-case-conflict
- - id: check-merge-conflict
- - id: check-symlinks
- - id: check-toml
- - id: check-yaml
- - id: debug-statements
- - id: end-of-file-fixer
- - id: mixed-line-ending
- args: ["--fix=lf"]
- - id: trailing-whitespace
+ - repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.2.0
+ hooks:
+ - id: check-added-large-files
+ - id: check-case-conflict
+ - id: check-merge-conflict
+ - id: check-symlinks
+ - id: check-toml
+ - id: check-yaml
+ - id: debug-statements
+ - id: end-of-file-fixer
+ - id: mixed-line-ending
+ args: [ "--fix=lf" ]
+ - id: trailing-whitespace
-- repo: https://github.com/pycqa/isort
- rev: 5.10.1
- hooks:
- - id: isort
- args: ["-a", "from __future__ import annotations"]
+ - repo: https://github.com/asottile/pyupgrade
+ rev: v2.32.0
+ hooks:
+ - id: pyupgrade
+ args: [ "--py37-plus" ]
-- repo: https://github.com/asottile/pyupgrade
- rev: v2.32.0
- hooks:
- - id: pyupgrade
- args: ["--py37-plus"]
+ - repo: https://github.com/psf/black
+ rev: 22.3.0
+ hooks:
+ - id: black
-- repo: https://github.com/csachs/pyproject-flake8
- rev: v0.0.1a4
- hooks:
- - id: pyproject-flake8
- additional_dependencies: [flake8-bugbear]
+ - repo: https://github.com/pycqa/isort
+ rev: 5.10.1
+ hooks:
+ - id: isort
+ args: [ "-a", "from __future__ import annotations" ]
-- repo: https://github.com/codespell-project/codespell
- rev: v2.1.0
- hooks:
- - id: codespell
+ - repo: https://github.com/csachs/pyproject-flake8
+ rev: v0.0.1a4
+ hooks:
+ - id: pyproject-flake8
+ additional_dependencies: [ flake8-bugbear ]
-- repo: https://github.com/pre-commit/pygrep-hooks
- rev: v1.9.0
- hooks:
- - id: python-check-blanket-noqa
- - id: python-check-blanket-type-ignore
- - id: python-no-eval
- - id: python-use-type-annotations
- - id: rst-backticks
- - id: rst-directive-colons
- - id: rst-inline-touching-normal
+ - repo: https://github.com/codespell-project/codespell
+ rev: v2.1.0
+ hooks:
+ - id: codespell
+
+ - repo: https://github.com/pre-commit/pygrep-hooks
+ rev: v1.9.0
+ hooks:
+ - id: python-check-blanket-noqa
+ - id: python-check-blanket-type-ignore
+ - id: python-no-eval
+ - id: python-use-type-annotations
+ - id: rst-backticks
+ - id: rst-directive-colons
+ - id: rst-inline-touching-normal
diff --git a/docs/conf.py b/docs/conf.py
index 2b78efa..e1972b3 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -18,9 +18,9 @@ from __future__ import annotations
# -- Project information -----------------------------------------------------
-project = 'APScheduler'
-copyright = 'Alex Grönholm'
-author = 'Alex Grönholm'
+project = "APScheduler"
+copyright = "Alex Grönholm"
+author = "Alex Grönholm"
# -- General configuration ---------------------------------------------------
@@ -28,13 +28,10 @@ author = 'Alex Grönholm'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
-extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx.ext.intersphinx'
-]
+extensions = ["sphinx.ext.autodoc", "sphinx.ext.intersphinx"]
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
+templates_path = ["_templates"]
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
@@ -47,12 +44,14 @@ exclude_patterns = []
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
-html_theme = 'sphinx_rtd_theme'
+html_theme = "sphinx_rtd_theme"
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
+html_static_path = ["_static"]
-intersphinx_mapping = {'python': ('https://docs.python.org/', None),
- 'sqlalchemy': ('http://docs.sqlalchemy.org/en/latest/', None)}
+intersphinx_mapping = {
+ "python": ("https://docs.python.org/", None),
+ "sqlalchemy": ("http://docs.sqlalchemy.org/en/latest/", None),
+}
diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py
index c2b19d5..f8538e1 100644
--- a/examples/executors/processpool.py
+++ b/examples/executors/processpool.py
@@ -11,14 +11,14 @@ from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
- print('Tick! The time is: %s' % datetime.now())
+ print("Tick! The time is: %s" % datetime.now())
-if __name__ == '__main__':
+if __name__ == "__main__":
scheduler = BlockingScheduler()
- scheduler.add_executor('processpool')
- scheduler.add_job(tick, 'interval', seconds=3)
- print('Press Ctrl+{} to exit'.format('Break' if os.name == 'nt' else 'C'))
+ scheduler.add_executor("processpool")
+ scheduler.add_job(tick, "interval", seconds=3)
+ print("Press Ctrl+{} to exit".format("Break" if os.name == "nt" else "C"))
try:
scheduler.initialize()
diff --git a/examples/misc/reference.py b/examples/misc/reference.py
index 0005e1a..75bfb1c 100644
--- a/examples/misc/reference.py
+++ b/examples/misc/reference.py
@@ -8,10 +8,10 @@ import os
from apscheduler.schedulers.blocking import BlockingScheduler
-if __name__ == '__main__':
+if __name__ == "__main__":
scheduler = BlockingScheduler()
- scheduler.add_job('sys:stdout.write', 'interval', seconds=3, args=['tick\n'])
- print('Press Ctrl+{} to exit'.format('Break' if os.name == 'nt' else 'C'))
+ scheduler.add_job("sys:stdout.write", "interval", seconds=3, args=["tick\n"])
+ print("Press Ctrl+{} to exit".format("Break" if os.name == "nt" else "C"))
try:
scheduler.initialize()
diff --git a/examples/rpc/client.py b/examples/rpc/client.py
index 234446b..2a3a52a 100644
--- a/examples/rpc/client.py
+++ b/examples/rpc/client.py
@@ -12,7 +12,9 @@ from time import sleep
import rpyc
-conn = rpyc.connect('localhost', 12345)
-job = conn.root.add_job('server:print_text', 'interval', args=['Hello, World'], seconds=2)
+conn = rpyc.connect("localhost", 12345)
+job = conn.root.add_job(
+ "server:print_text", "interval", args=["Hello, World"], seconds=2
+)
sleep(10)
conn.root.remove_job(job.id)
diff --git a/examples/rpc/server.py b/examples/rpc/server.py
index f3a195c..fc40c04 100644
--- a/examples/rpc/server.py
+++ b/examples/rpc/server.py
@@ -26,7 +26,9 @@ class SchedulerService(rpyc.Service):
def exposed_modify_job(self, job_id, jobstore=None, **changes):
return scheduler.modify_job(job_id, jobstore, **changes)
- def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
+ def exposed_reschedule_job(
+ self, job_id, jobstore=None, trigger=None, **trigger_args
+ ):
return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)
def exposed_pause_job(self, job_id, jobstore=None):
@@ -45,11 +47,13 @@ class SchedulerService(rpyc.Service):
return scheduler.get_jobs(jobstore)
-if __name__ == '__main__':
+if __name__ == "__main__":
scheduler = BackgroundScheduler()
scheduler.initialize()
- protocol_config = {'allow_public_attrs': True}
- server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
+ protocol_config = {"allow_public_attrs": True}
+ server = ThreadedServer(
+ SchedulerService, port=12345, protocol_config=protocol_config
+ )
try:
server.initialize()
except (KeyboardInterrupt, SystemExit):
diff --git a/examples/schedulers/async_.py b/examples/schedulers/async_.py
index 5ea9dca..e4eac0c 100644
--- a/examples/schedulers/async_.py
+++ b/examples/schedulers/async_.py
@@ -10,7 +10,7 @@ from apscheduler.workers.async_ import AsyncWorker
def say_hello():
- print('Hello!')
+ print("Hello!")
async def main():
@@ -18,6 +18,7 @@ async def main():
await scheduler.add_schedule(say_hello, IntervalTrigger(seconds=1))
await scheduler.wait_until_stopped()
+
logging.basicConfig(level=logging.DEBUG)
try:
anyio.run(main)
diff --git a/examples/schedulers/sync.py b/examples/schedulers/sync.py
index 369c535..12ae17f 100644
--- a/examples/schedulers/sync.py
+++ b/examples/schedulers/sync.py
@@ -8,12 +8,14 @@ from apscheduler.workers.sync import Worker
def say_hello():
- print('Hello!')
+ print("Hello!")
logging.basicConfig(level=logging.DEBUG)
try:
- with Scheduler() as scheduler, Worker(scheduler.data_store, portal=scheduler.portal):
+ with Scheduler() as scheduler, Worker(
+ scheduler.data_store, portal=scheduler.portal
+ ):
scheduler.add_schedule(say_hello, IntervalTrigger(seconds=1))
scheduler.wait_until_stopped()
except (KeyboardInterrupt, SystemExit):
diff --git a/pyproject.toml b/pyproject.toml
index 722b50b..6ee7ff4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -27,11 +27,7 @@ show_missing = true
[tool.isort]
src_paths = ["src"]
skip_gitignore = true
-line_length = 99
-multi_line_output = 4
-
-[tool.autopep8]
-max_line_length = 99
+profile = "black"
[tool.flake8]
max-line-length = 99
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py
index 929a8cc..00bb281 100644
--- a/src/apscheduler/abc.py
+++ b/src/apscheduler/abc.py
@@ -54,7 +54,7 @@ class Serializer(metaclass=ABCMeta):
pass
def serialize_to_unicode(self, obj) -> str:
- return b64encode(self.serialize(obj)).decode('ascii')
+ return b64encode(self.serialize(obj)).decode("ascii")
@abstractmethod
def deserialize(self, serialized: bytes):
@@ -91,10 +91,11 @@ class EventSource(metaclass=ABCMeta):
@abstractmethod
def subscribe(
- self, callback: Callable[[Event], Any],
+ self,
+ callback: Callable[[Event], Any],
event_types: Iterable[type[Event]] | None = None,
*,
- one_shot: bool = False
+ one_shot: bool = False,
) -> Subscription:
"""
Subscribe to events from this event source.
@@ -367,7 +368,9 @@ class AsyncDataStore:
"""
@abstractmethod
- async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ async def add_schedule(
+ self, schedule: Schedule, conflict_policy: ConflictPolicy
+ ) -> None:
"""
Add or update the given schedule in the data store.
@@ -398,7 +401,9 @@ class AsyncDataStore:
"""
@abstractmethod
- async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None:
+ async def release_schedules(
+ self, scheduler_id: str, schedules: list[Schedule]
+ ) -> None:
"""
Release the claims on the given schedules and update them on the store.
@@ -444,7 +449,9 @@ class AsyncDataStore:
"""
@abstractmethod
- async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
+ async def release_job(
+ self, worker_id: str, task_id: str, result: JobResult
+ ) -> None:
"""
Release the claim on the given job and record the result.
diff --git a/src/apscheduler/context.py b/src/apscheduler/context.py
index aa9977e..42ccf87 100644
--- a/src/apscheduler/context.py
+++ b/src/apscheduler/context.py
@@ -11,10 +11,12 @@ if TYPE_CHECKING:
from .workers.sync import Worker
#: The currently running (local) scheduler
-current_scheduler: ContextVar[Scheduler | AsyncScheduler | None] = ContextVar('current_scheduler',
- default=None)
+current_scheduler: ContextVar[Scheduler | AsyncScheduler | None] = ContextVar(
+ "current_scheduler", default=None
+)
#: The worker running the current job
-current_worker: ContextVar[Worker | AsyncWorker | None] = ContextVar('current_worker',
- default=None)
+current_worker: ContextVar[Worker | AsyncWorker | None] = ContextVar(
+ "current_worker", default=None
+)
#: Metadata about the current job
-job_info: ContextVar[JobInfo] = ContextVar('job_info')
+job_info: ContextVar[JobInfo] = ContextVar("job_info")
diff --git a/src/apscheduler/converters.py b/src/apscheduler/converters.py
index 4b9feea..7502327 100644
--- a/src/apscheduler/converters.py
+++ b/src/apscheduler/converters.py
@@ -8,15 +8,15 @@ from uuid import UUID
from . import abc
-TEnum = TypeVar('TEnum', bound=Enum)
+TEnum = TypeVar("TEnum", bound=Enum)
def as_aware_datetime(value: datetime | str) -> datetime:
"""Convert the value from a string to a timezone aware datetime."""
if isinstance(value, str):
# fromisoformat() does not handle the "Z" suffix
- if value.upper().endswith('Z'):
- value = value[:-1] + '+00:00'
+ if value.upper().endswith("Z"):
+ value = value[:-1] + "+00:00"
value = datetime.fromisoformat(value)
@@ -51,6 +51,7 @@ def as_enum(enum_class: type[TEnum]) -> Callable[[TEnum | str], TEnum]:
def as_async_datastore(value: abc.DataStore | abc.AsyncDataStore) -> abc.AsyncDataStore:
if isinstance(value, abc.DataStore):
from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
+
return AsyncDataStoreAdapter(value)
return value
diff --git a/src/apscheduler/datastores/async_adapter.py b/src/apscheduler/datastores/async_adapter.py
index 945851f..3b2342e 100644
--- a/src/apscheduler/datastores/async_adapter.py
+++ b/src/apscheduler/datastores/async_adapter.py
@@ -39,7 +39,9 @@ class AsyncDataStoreAdapter(AsyncDataStore):
await self._exit_stack.enter_async_context(self._events)
await to_thread.run_sync(self.original.__enter__)
- self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__))
+ self._exit_stack.push_async_exit(
+ partial(to_thread.run_sync, self.original.__exit__)
+ )
return self
@@ -61,17 +63,25 @@ class AsyncDataStoreAdapter(AsyncDataStore):
async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
return await to_thread.run_sync(self.original.get_schedules, ids)
- async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ async def add_schedule(
+ self, schedule: Schedule, conflict_policy: ConflictPolicy
+ ) -> None:
await to_thread.run_sync(self.original.add_schedule, schedule, conflict_policy)
async def remove_schedules(self, ids: Iterable[str]) -> None:
await to_thread.run_sync(self.original.remove_schedules, ids)
async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
- return await to_thread.run_sync(self.original.acquire_schedules, scheduler_id, limit)
+ return await to_thread.run_sync(
+ self.original.acquire_schedules, scheduler_id, limit
+ )
- async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None:
- await to_thread.run_sync(self.original.release_schedules, scheduler_id, schedules)
+ async def release_schedules(
+ self, scheduler_id: str, schedules: list[Schedule]
+ ) -> None:
+ await to_thread.run_sync(
+ self.original.release_schedules, scheduler_id, schedules
+ )
async def get_next_schedule_run_time(self) -> datetime | None:
return await to_thread.run_sync(self.original.get_next_schedule_run_time)
@@ -85,7 +95,9 @@ class AsyncDataStoreAdapter(AsyncDataStore):
async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]:
return await to_thread.run_sync(self.original.acquire_jobs, worker_id, limit)
- async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
+ async def release_job(
+ self, worker_id: str, task_id: str, result: JobResult
+ ) -> None:
await to_thread.run_sync(self.original.release_job, worker_id, task_id, result)
async def get_job_result(self, job_id: UUID) -> JobResult | None:
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index ca97330..89163fa 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -21,9 +21,18 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- DataStoreEvent, JobAcquired, JobAdded, JobDeserializationFailed, ScheduleAdded,
- ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
- TaskUpdated)
+ DataStoreEvent,
+ JobAcquired,
+ JobAdded,
+ JobDeserializationFailed,
+ ScheduleAdded,
+ ScheduleDeserializationFailed,
+ ScheduleRemoved,
+ ScheduleUpdated,
+ TaskAdded,
+ TaskRemoved,
+ TaskUpdated,
+)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -50,14 +59,20 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
# Construct the Tenacity retry controller
# OSError is raised by asyncpg if it can't connect
self._retrying = tenacity.AsyncRetrying(
- stop=self.retry_settings.stop, wait=self.retry_settings.wait,
+ stop=self.retry_settings.stop,
+ wait=self.retry_settings.wait,
retry=tenacity.retry_if_exception_type((InterfaceError, OSError)),
- after=self._after_attempt, sleep=anyio.sleep, reraise=True)
+ after=self._after_attempt,
+ sleep=anyio.sleep,
+ reraise=True,
+ )
async def __aenter__(self):
- asynclib = sniffio.current_async_library() or '(unknown)'
- if asynclib != 'asyncio':
- raise RuntimeError(f'This data store requires asyncio; currently running: {asynclib}')
+ asynclib = sniffio.current_async_library() or "(unknown)"
+ if asynclib != "asyncio":
+ raise RuntimeError(
+ f"This data store requires asyncio; currently running: {asynclib}"
+ )
# Verify that the schema is in place
async for attempt in self._retrying:
@@ -72,11 +87,14 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
result = await conn.execute(query)
version = result.scalar()
if version is None:
- await conn.execute(self.t_metadata.insert(values={'schema_version': 1}))
+ await conn.execute(
+ self.t_metadata.insert(values={"schema_version": 1})
+ )
elif version > 1:
raise RuntimeError(
- f'Unexpected schema version ({version}); '
- f'only version 1 is supported by this version of APScheduler')
+ f"Unexpected schema version ({version}); "
+ f"only version 1 is supported by this version of APScheduler"
+ )
await self._events.__aenter__()
return self
@@ -95,7 +113,8 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
schedules.append(Schedule.unmarshal(self.serializer, row._asdict()))
except SerializationError as exc:
await self._events.publish(
- ScheduleDeserializationFailed(schedule_id=row['id'], exception=exc))
+ ScheduleDeserializationFailed(schedule_id=row["id"], exception=exc)
+ )
return schedules
@@ -106,25 +125,33 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
jobs.append(Job.unmarshal(self.serializer, row._asdict()))
except SerializationError as exc:
await self._events.publish(
- JobDeserializationFailed(job_id=row['id'], exception=exc))
+ JobDeserializationFailed(job_id=row["id"], exception=exc)
+ )
return jobs
async def add_task(self, task: Task) -> None:
- insert = self.t_tasks.insert().\
- values(id=task.id, func=callable_to_ref(task.func),
- max_running_jobs=task.max_running_jobs,
- misfire_grace_time=task.misfire_grace_time)
+ insert = self.t_tasks.insert().values(
+ id=task.id,
+ func=callable_to_ref(task.func),
+ max_running_jobs=task.max_running_jobs,
+ misfire_grace_time=task.misfire_grace_time,
+ )
try:
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
await conn.execute(insert)
except IntegrityError:
- update = self.t_tasks.update().\
- values(func=callable_to_ref(task.func), max_running_jobs=task.max_running_jobs,
- misfire_grace_time=task.misfire_grace_time).\
- where(self.t_tasks.c.id == task.id)
+ update = (
+ self.t_tasks.update()
+ .values(
+ func=callable_to_ref(task.func),
+ max_running_jobs=task.max_running_jobs,
+ misfire_grace_time=task.misfire_grace_time,
+ )
+ .where(self.t_tasks.c.id == task.id)
+ )
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
@@ -146,9 +173,15 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
await self._events.publish(TaskRemoved(task_id=task_id))
async def get_task(self, task_id: str) -> Task:
- query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\
- where(self.t_tasks.c.id == task_id)
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
+ ]
+ ).where(self.t_tasks.c.id == task_id)
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
@@ -161,17 +194,27 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
raise TaskLookupError
async def get_tasks(self) -> list[Task]:
- query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\
- order_by(self.t_tasks.c.id)
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
+ ]
+ ).order_by(self.t_tasks.c.id)
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
result = await conn.execute(query)
- tasks = [Task.unmarshal(self.serializer, row._asdict()) for row in result]
+ tasks = [
+ Task.unmarshal(self.serializer, row._asdict()) for row in result
+ ]
return tasks
- async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ async def add_schedule(
+ self, schedule: Schedule, conflict_policy: ConflictPolicy
+ ) -> None:
event: DataStoreEvent
values = schedule.marshal(self.serializer)
insert = self.t_schedules.insert().values(**values)
@@ -184,30 +227,38 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
if conflict_policy is ConflictPolicy.exception:
raise ConflictingIdError(schedule.id) from None
elif conflict_policy is ConflictPolicy.replace:
- del values['id']
- update = self.t_schedules.update().\
- where(self.t_schedules.c.id == schedule.id).\
- values(**values)
+ del values["id"]
+ update = (
+ self.t_schedules.update()
+ .where(self.t_schedules.c.id == schedule.id)
+ .values(**values)
+ )
async for attempt in self._retrying:
async with attempt, self.engine.begin() as conn:
await conn.execute(update)
- event = ScheduleUpdated(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleUpdated(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
await self._events.publish(event)
else:
- event = ScheduleAdded(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleAdded(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
await self._events.publish(event)
async def remove_schedules(self, ids: Iterable[str]) -> None:
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
- delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
+ delete = self.t_schedules.delete().where(
+ self.t_schedules.c.id.in_(ids)
+ )
if self._supports_update_returning:
delete = delete.returning(self.t_schedules.c.id)
- removed_ids: Iterable[str] = [row[0] for row in await conn.execute(delete)]
+ removed_ids: Iterable[str] = [
+ row[0] for row in await conn.execute(delete)
+ ]
else:
# TODO: actually check which rows were deleted?
await conn.execute(delete)
@@ -233,31 +284,46 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
async with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
- schedules_cte = select(self.t_schedules.c.id).\
- where(and_(self.t_schedules.c.next_fire_time.isnot(None),
- self.t_schedules.c.next_fire_time <= now,
- or_(self.t_schedules.c.acquired_until.is_(None),
- self.t_schedules.c.acquired_until < now))).\
- order_by(self.t_schedules.c.next_fire_time).\
- limit(limit).with_for_update(skip_locked=True).cte()
+ schedules_cte = (
+ select(self.t_schedules.c.id)
+ .where(
+ and_(
+ self.t_schedules.c.next_fire_time.isnot(None),
+ self.t_schedules.c.next_fire_time <= now,
+ or_(
+ self.t_schedules.c.acquired_until.is_(None),
+ self.t_schedules.c.acquired_until < now,
+ ),
+ )
+ )
+ .order_by(self.t_schedules.c.next_fire_time)
+ .limit(limit)
+ .with_for_update(skip_locked=True)
+ .cte()
+ )
subselect = select([schedules_cte.c.id])
- update = self.t_schedules.update().\
- where(self.t_schedules.c.id.in_(subselect)).\
- values(acquired_by=scheduler_id, acquired_until=acquired_until)
+ update = (
+ self.t_schedules.update()
+ .where(self.t_schedules.c.id.in_(subselect))
+ .values(acquired_by=scheduler_id, acquired_until=acquired_until)
+ )
if self._supports_update_returning:
update = update.returning(*self.t_schedules.columns)
result = await conn.execute(update)
else:
await conn.execute(update)
- query = self.t_schedules.select().\
- where(and_(self.t_schedules.c.acquired_by == scheduler_id))
+ query = self.t_schedules.select().where(
+ and_(self.t_schedules.c.acquired_by == scheduler_id)
+ )
result = conn.execute(query)
schedules = await self._deserialize_schedules(result)
return schedules
- async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None:
+ async def release_schedules(
+ self, scheduler_id: str, schedules: list[Schedule]
+ ) -> None:
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
@@ -267,52 +333,74 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
for schedule in schedules:
if schedule.next_fire_time is not None:
try:
- serialized_trigger = self.serializer.serialize(schedule.trigger)
+ serialized_trigger = self.serializer.serialize(
+ schedule.trigger
+ )
except SerializationError:
self._logger.exception(
- 'Error serializing trigger for schedule %r – '
- 'removing from data store', schedule.id)
+ "Error serializing trigger for schedule %r – "
+ "removing from data store",
+ schedule.id,
+ )
finished_schedule_ids.append(schedule.id)
continue
- update_args.append({
- 'p_id': schedule.id,
- 'p_trigger': serialized_trigger,
- 'p_next_fire_time': schedule.next_fire_time
- })
+ update_args.append(
+ {
+ "p_id": schedule.id,
+ "p_trigger": serialized_trigger,
+ "p_next_fire_time": schedule.next_fire_time,
+ }
+ )
else:
finished_schedule_ids.append(schedule.id)
# Update schedules that have a next fire time
if update_args:
- p_id: BindParameter = bindparam('p_id')
- p_trigger: BindParameter = bindparam('p_trigger')
- p_next_fire_time: BindParameter = bindparam('p_next_fire_time')
- update = self.t_schedules.update().\
- where(and_(self.t_schedules.c.id == p_id,
- self.t_schedules.c.acquired_by == scheduler_id)).\
- values(trigger=p_trigger, next_fire_time=p_next_fire_time,
- acquired_by=None, acquired_until=None)
- next_fire_times = {arg['p_id']: arg['p_next_fire_time']
- for arg in update_args}
+ p_id: BindParameter = bindparam("p_id")
+ p_trigger: BindParameter = bindparam("p_trigger")
+ p_next_fire_time: BindParameter = bindparam("p_next_fire_time")
+ update = (
+ self.t_schedules.update()
+ .where(
+ and_(
+ self.t_schedules.c.id == p_id,
+ self.t_schedules.c.acquired_by == scheduler_id,
+ )
+ )
+ .values(
+ trigger=p_trigger,
+ next_fire_time=p_next_fire_time,
+ acquired_by=None,
+ acquired_until=None,
+ )
+ )
+ next_fire_times = {
+ arg["p_id"]: arg["p_next_fire_time"] for arg in update_args
+ }
if self._supports_update_returning:
update = update.returning(self.t_schedules.c.id)
updated_ids = [
- row[0] for row in await conn.execute(update, update_args)]
+ row[0]
+ for row in await conn.execute(update, update_args)
+ ]
else:
# TODO: actually check which rows were updated?
await conn.execute(update, update_args)
updated_ids = list(next_fire_times)
for schedule_id in updated_ids:
- event = ScheduleUpdated(schedule_id=schedule_id,
- next_fire_time=next_fire_times[schedule_id])
+ event = ScheduleUpdated(
+ schedule_id=schedule_id,
+ next_fire_time=next_fire_times[schedule_id],
+ )
update_events.append(event)
# Remove schedules that have no next fire time or failed to serialize
if finished_schedule_ids:
- delete = self.t_schedules.delete().\
- where(self.t_schedules.c.id.in_(finished_schedule_ids))
+ delete = self.t_schedules.delete().where(
+ self.t_schedules.c.id.in_(finished_schedule_ids)
+ )
await conn.execute(delete)
for event in update_events:
@@ -322,10 +410,12 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
await self._events.publish(ScheduleRemoved(schedule_id=schedule_id))
async def get_next_schedule_run_time(self) -> datetime | None:
- statenent = select(self.t_schedules.c.next_fire_time).\
- where(self.t_schedules.c.next_fire_time.isnot(None)).\
- order_by(self.t_schedules.c.next_fire_time).\
- limit(1)
+ statenent = (
+ select(self.t_schedules.c.next_fire_time)
+ .where(self.t_schedules.c.next_fire_time.isnot(None))
+ .order_by(self.t_schedules.c.next_fire_time)
+ .limit(1)
+ )
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
@@ -340,8 +430,12 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
async with self.engine.begin() as conn:
await conn.execute(insert)
- event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- tags=job.tags)
+ event = JobAdded(
+ job_id=job.id,
+ task_id=job.task_id,
+ schedule_id=job.schedule_id,
+ tags=job.tags,
+ )
await self._events.publish(event)
async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
@@ -362,13 +456,19 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
async with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
- query = self.t_jobs.select().\
- join(self.t_tasks, self.t_tasks.c.id == self.t_jobs.c.task_id).\
- where(or_(self.t_jobs.c.acquired_until.is_(None),
- self.t_jobs.c.acquired_until < now)).\
- order_by(self.t_jobs.c.created_at).\
- with_for_update(skip_locked=True).\
- limit(limit)
+ query = (
+ self.t_jobs.select()
+ .join(self.t_tasks, self.t_tasks.c.id == self.t_jobs.c.task_id)
+ .where(
+ or_(
+ self.t_jobs.c.acquired_until.is_(None),
+ self.t_jobs.c.acquired_until < now,
+ )
+ )
+ .order_by(self.t_jobs.c.created_at)
+ .with_for_update(skip_locked=True)
+ .limit(limit)
+ )
result = await conn.execute(query)
if not result:
@@ -379,11 +479,16 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
task_ids: set[str] = {job.task_id for job in jobs}
# Retrieve the limits
- query = select([
- self.t_tasks.c.id,
- self.t_tasks.c.max_running_jobs - self.t_tasks.c.running_jobs]).\
- where(self.t_tasks.c.max_running_jobs.isnot(None),
- self.t_tasks.c.id.in_(task_ids))
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.max_running_jobs
+ - self.t_tasks.c.running_jobs,
+ ]
+ ).where(
+ self.t_tasks.c.max_running_jobs.isnot(None),
+ self.t_tasks.c.id.in_(task_ids),
+ )
result = await conn.execute(query)
job_slots_left: dict[str, int] = dict(result.fetchall())
@@ -404,19 +509,29 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
if acquired_jobs:
# Mark the acquired jobs as acquired by this worker
acquired_job_ids = [job.id for job in acquired_jobs]
- update = self.t_jobs.update().\
- values(acquired_by=worker_id, acquired_until=acquired_until).\
- where(self.t_jobs.c.id.in_(acquired_job_ids))
+ update = (
+ self.t_jobs.update()
+ .values(
+ acquired_by=worker_id, acquired_until=acquired_until
+ )
+ .where(self.t_jobs.c.id.in_(acquired_job_ids))
+ )
await conn.execute(update)
# Increment the running job counters on each task
- p_id: BindParameter = bindparam('p_id')
- p_increment: BindParameter = bindparam('p_increment')
- params = [{'p_id': task_id, 'p_increment': increment}
- for task_id, increment in increments.items()]
- update = self.t_tasks.update().\
- values(running_jobs=self.t_tasks.c.running_jobs + p_increment).\
- where(self.t_tasks.c.id == p_id)
+ p_id: BindParameter = bindparam("p_id")
+ p_increment: BindParameter = bindparam("p_increment")
+ params = [
+ {"p_id": task_id, "p_increment": increment}
+ for task_id, increment in increments.items()
+ ]
+ update = (
+ self.t_tasks.update()
+ .values(
+ running_jobs=self.t_tasks.c.running_jobs + p_increment
+ )
+ .where(self.t_tasks.c.id == p_id)
+ )
await conn.execute(update, params)
# Publish the appropriate events
@@ -425,7 +540,9 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
return acquired_jobs
- async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
+ async def release_job(
+ self, worker_id: str, task_id: str, result: JobResult
+ ) -> None:
async for attempt in self._retrying:
with attempt:
async with self.engine.begin() as conn:
@@ -435,13 +552,17 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
await conn.execute(insert)
# Decrement the running jobs counter
- update = self.t_tasks.update().\
- values(running_jobs=self.t_tasks.c.running_jobs - 1).\
- where(self.t_tasks.c.id == task_id)
+ update = (
+ self.t_tasks.update()
+ .values(running_jobs=self.t_tasks.c.running_jobs - 1)
+ .where(self.t_tasks.c.id == task_id)
+ )
await conn.execute(update)
# Delete the job
- delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
+ delete = self.t_jobs.delete().where(
+ self.t_jobs.c.id == result.job_id
+ )
await conn.execute(delete)
async def get_job_result(self, job_id: UUID) -> JobResult | None:
@@ -449,13 +570,19 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
with attempt:
async with self.engine.begin() as conn:
# Retrieve the result
- query = self.t_job_results.select().\
- where(self.t_job_results.c.job_id == job_id)
+ query = self.t_job_results.select().where(
+ self.t_job_results.c.job_id == job_id
+ )
row = (await conn.execute(query)).fetchone()
# Delete the result
- delete = self.t_job_results.delete().\
- where(self.t_job_results.c.job_id == job_id)
+ delete = self.t_job_results.delete().where(
+ self.t_job_results.c.job_id == job_id
+ )
await conn.execute(delete)
- return JobResult.unmarshal(self.serializer, row._asdict()) if row else None
+ return (
+ JobResult.unmarshal(self.serializer, row._asdict())
+ if row
+ else None
+ )
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index 7014bf1..c2d7420 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -13,8 +13,16 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
- TaskRemoved, TaskUpdated)
+ JobAcquired,
+ JobAdded,
+ JobReleased,
+ ScheduleAdded,
+ ScheduleRemoved,
+ ScheduleUpdated,
+ TaskAdded,
+ TaskRemoved,
+ TaskUpdated,
+)
from ..exceptions import ConflictingIdError, TaskLookupError
from ..structures import JobResult, Task
from ..util import reentrant
@@ -62,7 +70,9 @@ class ScheduleState:
@attrs.define(order=True)
class JobState:
job: Job = attrs.field(order=False)
- created_at: datetime = attrs.field(init=False, factory=partial(datetime.now, timezone.utc))
+ created_at: datetime = attrs.field(
+ init=False, factory=partial(datetime.now, timezone.utc)
+ )
acquired_by: str | None = attrs.field(eq=False, order=False, default=None)
acquired_until: datetime | None = attrs.field(eq=False, order=False, default=None)
@@ -81,10 +91,14 @@ class MemoryDataStore(DataStore):
_tasks: dict[str, TaskState] = attrs.Factory(dict)
_schedules: list[ScheduleState] = attrs.Factory(list)
_schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict)
- _schedules_by_task_id: dict[str, set[ScheduleState]] = attrs.Factory(partial(defaultdict, set))
+ _schedules_by_task_id: dict[str, set[ScheduleState]] = attrs.Factory(
+ partial(defaultdict, set)
+ )
_jobs: list[JobState] = attrs.Factory(list)
_jobs_by_id: dict[UUID, JobState] = attrs.Factory(dict)
- _jobs_by_task_id: dict[str, set[JobState]] = attrs.Factory(partial(defaultdict, set))
+ _jobs_by_task_id: dict[str, set[JobState]] = attrs.Factory(
+ partial(defaultdict, set)
+ )
_job_results: dict[UUID, JobResult] = attrs.Factory(dict)
def _find_schedule_index(self, state: ScheduleState) -> int | None:
@@ -109,8 +123,11 @@ class MemoryDataStore(DataStore):
return self._events
def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
- return [state.schedule for state in self._schedules
- if ids is None or state.schedule.id in ids]
+ return [
+ state.schedule
+ for state in self._schedules
+ if ids is None or state.schedule.id in ids
+ ]
def add_task(self, task: Task) -> None:
task_exists = task.id in self._tasks
@@ -135,7 +152,9 @@ class MemoryDataStore(DataStore):
raise TaskLookupError(task_id) from None
def get_tasks(self) -> list[Task]:
- return sorted((state.task for state in self._tasks.values()), key=lambda task: task.id)
+ return sorted(
+ (state.task for state in self._tasks.values()), key=lambda task: task.id
+ )
def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
old_state = self._schedules_by_id.get(schedule.id)
@@ -155,11 +174,13 @@ class MemoryDataStore(DataStore):
insort_right(self._schedules, state)
if old_state is not None:
- event = ScheduleUpdated(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleUpdated(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
else:
- event = ScheduleAdded(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleAdded(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
self._events.publish(event)
@@ -207,7 +228,9 @@ class MemoryDataStore(DataStore):
schedule_state.acquired_by = None
schedule_state.acquired_until = None
insort_right(self._schedules, schedule_state)
- event = ScheduleUpdated(schedule_id=s.id, next_fire_time=s.next_fire_time)
+ event = ScheduleUpdated(
+ schedule_id=s.id, next_fire_time=s.next_fire_time
+ )
self._events.publish(event)
else:
finished_schedule_ids.append(s.id)
@@ -224,8 +247,12 @@ class MemoryDataStore(DataStore):
self._jobs_by_id[job.id] = state
self._jobs_by_task_id[job.task_id].add(state)
- event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- tags=job.tags)
+ event = JobAdded(
+ job_id=job.id,
+ task_id=job.task_id,
+ schedule_id=job.schedule_id,
+ tags=job.tags,
+ )
self._events.publish(event)
def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
@@ -248,14 +275,18 @@ class MemoryDataStore(DataStore):
task_state.running_jobs -= 1
# Check if the task allows one more job to be started
- if (task_state.task.max_running_jobs is not None
- and task_state.running_jobs >= task_state.task.max_running_jobs):
+ if (
+ task_state.task.max_running_jobs is not None
+ and task_state.running_jobs >= task_state.task.max_running_jobs
+ ):
continue
# Mark the job as acquired by this worker
jobs.append(job_state.job)
job_state.acquired_by = worker_id
- job_state.acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
+ job_state.acquired_until = now + timedelta(
+ seconds=self.lock_expiration_delay
+ )
# Increment the number of running jobs for this task
task_state.running_jobs += 1
@@ -287,7 +318,9 @@ class MemoryDataStore(DataStore):
# Publish the event
self._events.publish(
- JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ JobReleased(
+ job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
+ )
)
def get_job_result(self, job_id: UUID) -> JobResult | None:
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 7d6fa76..482d85e 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -23,10 +23,23 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- DataStoreEvent, JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved,
- ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
+ DataStoreEvent,
+ JobAcquired,
+ JobAdded,
+ JobReleased,
+ ScheduleAdded,
+ ScheduleRemoved,
+ ScheduleUpdated,
+ TaskAdded,
+ TaskRemoved,
+ TaskUpdated,
+)
from ..exceptions import (
- ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
+ ConflictingIdError,
+ DeserializationError,
+ SerializationError,
+ TaskLookupError,
+)
from ..serializers.pickle import PickleSerializer
from ..structures import JobResult, RetrySettings, Task
from ..util import reentrant
@@ -55,13 +68,15 @@ def ensure_uuid_presentation(client: MongoClient) -> None:
class MongoDBDataStore(DataStore):
client: MongoClient = attrs.field(validator=instance_of(MongoClient))
serializer: Serializer = attrs.field(factory=PickleSerializer, kw_only=True)
- database: str = attrs.field(default='apscheduler', kw_only=True)
+ database: str = attrs.field(default="apscheduler", kw_only=True)
lock_expiration_delay: float = attrs.field(default=30, kw_only=True)
retry_settings: RetrySettings = attrs.field(default=RetrySettings())
start_from_scratch: bool = attrs.field(default=False, kw_only=True)
_task_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Task)]
- _schedule_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Schedule)]
+ _schedule_attrs: ClassVar[list[str]] = [
+ field.name for field in attrs.fields(Schedule)
+ ]
_job_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Job)]
_logger: Logger = attrs.field(init=False, factory=lambda: getLogger(__name__))
@@ -72,23 +87,32 @@ class MongoDBDataStore(DataStore):
def __attrs_post_init__(self) -> None:
# Construct the Tenacity retry controller
- self._retrying = Retrying(stop=self.retry_settings.stop, wait=self.retry_settings.wait,
- retry=tenacity.retry_if_exception_type(ConnectionFailure),
- after=self._after_attempt, reraise=True)
-
- type_registry = TypeRegistry([
- CustomEncoder(timedelta, timedelta.total_seconds),
- CustomEncoder(ConflictPolicy, operator.attrgetter('name')),
- CustomEncoder(CoalescePolicy, operator.attrgetter('name')),
- CustomEncoder(JobOutcome, operator.attrgetter('name'))
- ])
- codec_options = CodecOptions(tz_aware=True, type_registry=type_registry,
- uuid_representation=UuidRepresentation.STANDARD)
+ self._retrying = Retrying(
+ stop=self.retry_settings.stop,
+ wait=self.retry_settings.wait,
+ retry=tenacity.retry_if_exception_type(ConnectionFailure),
+ after=self._after_attempt,
+ reraise=True,
+ )
+
+ type_registry = TypeRegistry(
+ [
+ CustomEncoder(timedelta, timedelta.total_seconds),
+ CustomEncoder(ConflictPolicy, operator.attrgetter("name")),
+ CustomEncoder(CoalescePolicy, operator.attrgetter("name")),
+ CustomEncoder(JobOutcome, operator.attrgetter("name")),
+ ]
+ )
+ codec_options = CodecOptions(
+ tz_aware=True,
+ type_registry=type_registry,
+ uuid_representation=UuidRepresentation.STANDARD,
+ )
database = self.client.get_database(self.database, codec_options=codec_options)
- self._tasks: Collection = database['tasks']
- self._schedules: Collection = database['schedules']
- self._jobs: Collection = database['jobs']
- self._jobs_results: Collection = database['job_results']
+ self._tasks: Collection = database["tasks"]
+ self._schedules: Collection = database["schedules"]
+ self._jobs: Collection = database["jobs"]
+ self._jobs_results: Collection = database["job_results"]
@classmethod
def from_url(cls, uri: str, **options) -> MongoDBDataStore:
@@ -100,14 +124,19 @@ class MongoDBDataStore(DataStore):
return self._events
def _after_attempt(self, retry_state: tenacity.RetryCallState) -> None:
- self._logger.warning('Temporary data store error (attempt %d): %s',
- retry_state.attempt_number, retry_state.outcome.exception())
+ self._logger.warning(
+ "Temporary data store error (attempt %d): %s",
+ retry_state.attempt_number,
+ retry_state.outcome.exception(),
+ )
def __enter__(self):
server_info = self.client.server_info()
- if server_info['versionArray'] < [4, 0]:
- raise RuntimeError(f"MongoDB server must be at least v4.0; current version = "
- f"{server_info['version']}")
+ if server_info["versionArray"] < [4, 0]:
+ raise RuntimeError(
+ f"MongoDB server must be at least v4.0; current version = "
+ f"{server_info['version']}"
+ )
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
@@ -120,11 +149,11 @@ class MongoDBDataStore(DataStore):
self._jobs.delete_many({}, session=session)
self._jobs_results.delete_many({}, session=session)
- self._schedules.create_index('next_fire_time', session=session)
- self._jobs.create_index('task_id', session=session)
- self._jobs.create_index('created_at', session=session)
- self._jobs.create_index('tags', session=session)
- self._jobs_results.create_index('finished_at', session=session)
+ self._schedules.create_index("next_fire_time", session=session)
+ self._jobs.create_index("task_id", session=session)
+ self._jobs.create_index("created_at", session=session)
+ self._jobs.create_index("tags", session=session)
+ self._jobs_results.create_index("finished_at", session=session)
return self
@@ -135,10 +164,12 @@ class MongoDBDataStore(DataStore):
for attempt in self._retrying:
with attempt:
previous = self._tasks.find_one_and_update(
- {'_id': task.id},
- {'$set': task.marshal(self.serializer),
- '$setOnInsert': {'running_jobs': 0}},
- upsert=True
+ {"_id": task.id},
+ {
+ "$set": task.marshal(self.serializer),
+ "$setOnInsert": {"running_jobs": 0},
+ },
+ upsert=True,
)
self._local_tasks[task.id] = task
@@ -150,7 +181,7 @@ class MongoDBDataStore(DataStore):
def remove_task(self, task_id: str) -> None:
for attempt in self._retrying:
with attempt:
- if not self._tasks.find_one_and_delete({'_id': task_id}):
+ if not self._tasks.find_one_and_delete({"_id": task_id}):
raise TaskLookupError(task_id)
del self._local_tasks[task_id]
@@ -162,38 +193,45 @@ class MongoDBDataStore(DataStore):
except KeyError:
for attempt in self._retrying:
with attempt:
- document = self._tasks.find_one({'_id': task_id}, projection=self._task_attrs)
+ document = self._tasks.find_one(
+ {"_id": task_id}, projection=self._task_attrs
+ )
if not document:
raise TaskLookupError(task_id)
- document['id'] = document.pop('id')
- task = self._local_tasks[task_id] = Task.unmarshal(self.serializer, document)
+ document["id"] = document.pop("id")
+ task = self._local_tasks[task_id] = Task.unmarshal(
+ self.serializer, document
+ )
return task
def get_tasks(self) -> list[Task]:
for attempt in self._retrying:
with attempt:
tasks: list[Task] = []
- for document in self._tasks.find(projection=self._task_attrs,
- sort=[('_id', pymongo.ASCENDING)]):
- document['id'] = document.pop('_id')
+ for document in self._tasks.find(
+ projection=self._task_attrs, sort=[("_id", pymongo.ASCENDING)]
+ ):
+ document["id"] = document.pop("_id")
tasks.append(Task.unmarshal(self.serializer, document))
return tasks
def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
- filters = {'_id': {'$in': list(ids)}} if ids is not None else {}
+ filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
for attempt in self._retrying:
with attempt:
schedules: list[Schedule] = []
- cursor = self._schedules.find(filters).sort('_id')
+ cursor = self._schedules.find(filters).sort("_id")
for document in cursor:
- document['id'] = document.pop('_id')
+ document["id"] = document.pop("_id")
try:
schedule = Schedule.unmarshal(self.serializer, document)
except DeserializationError:
- self._logger.warning('Failed to deserialize schedule %r', document['_id'])
+ self._logger.warning(
+ "Failed to deserialize schedule %r", document["_id"]
+ )
continue
schedules.append(schedule)
@@ -203,7 +241,7 @@ class MongoDBDataStore(DataStore):
def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
event: DataStoreEvent
document = schedule.marshal(self.serializer)
- document['_id'] = document.pop('id')
+ document["_id"] = document.pop("id")
try:
for attempt in self._retrying:
with attempt:
@@ -214,23 +252,28 @@ class MongoDBDataStore(DataStore):
elif conflict_policy is ConflictPolicy.replace:
for attempt in self._retrying:
with attempt:
- self._schedules.replace_one({'_id': schedule.id}, document, True)
+ self._schedules.replace_one(
+ {"_id": schedule.id}, document, True
+ )
event = ScheduleUpdated(
- schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
self._events.publish(event)
else:
- event = ScheduleAdded(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleAdded(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
self._events.publish(event)
def remove_schedules(self, ids: Iterable[str]) -> None:
- filters = {'_id': {'$in': list(ids)}} if ids is not None else {}
+ filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
for attempt in self._retrying:
with attempt, self.client.start_session() as session:
- cursor = self._schedules.find(filters, projection=['_id'], session=session)
- ids = [doc['_id'] for doc in cursor]
+ cursor = self._schedules.find(
+ filters, projection=["_id"], session=session
+ )
+ ids = [doc["_id"] for doc in cursor]
if ids:
self._schedules.delete_many(filters, session=session)
@@ -241,25 +284,37 @@ class MongoDBDataStore(DataStore):
for attempt in self._retrying:
with attempt, self.client.start_session() as session:
schedules: list[Schedule] = []
- cursor = self._schedules.find(
- {'next_fire_time': {'$ne': None},
- '$or': [{'acquired_until': {'$exists': False}},
- {'acquired_until': {'$lt': datetime.now(timezone.utc)}}]
- },
- session=session
- ).sort('next_fire_time').limit(limit)
+ cursor = (
+ self._schedules.find(
+ {
+ "next_fire_time": {"$ne": None},
+ "$or": [
+ {"acquired_until": {"$exists": False}},
+ {"acquired_until": {"$lt": datetime.now(timezone.utc)}},
+ ],
+ },
+ session=session,
+ )
+ .sort("next_fire_time")
+ .limit(limit)
+ )
for document in cursor:
- document['id'] = document.pop('_id')
+ document["id"] = document.pop("_id")
schedule = Schedule.unmarshal(self.serializer, document)
schedules.append(schedule)
if schedules:
now = datetime.now(timezone.utc)
acquired_until = datetime.fromtimestamp(
- now.timestamp() + self.lock_expiration_delay, now.tzinfo)
- filters = {'_id': {'$in': [schedule.id for schedule in schedules]}}
- update = {'$set': {'acquired_by': scheduler_id,
- 'acquired_until': acquired_until}}
+ now.timestamp() + self.lock_expiration_delay, now.tzinfo
+ )
+ filters = {"_id": {"$in": [schedule.id for schedule in schedules]}}
+ update = {
+ "$set": {
+ "acquired_by": scheduler_id,
+ "acquired_until": acquired_until,
+ }
+ }
self._schedules.update_many(filters, update, session=session)
return schedules
@@ -271,26 +326,28 @@ class MongoDBDataStore(DataStore):
# Update schedules that have a next fire time
requests = []
for schedule in schedules:
- filters = {'_id': schedule.id, 'acquired_by': scheduler_id}
+ filters = {"_id": schedule.id, "acquired_by": scheduler_id}
if schedule.next_fire_time is not None:
try:
serialized_trigger = self.serializer.serialize(schedule.trigger)
except SerializationError:
- self._logger.exception('Error serializing schedule %r – '
- 'removing from data store', schedule.id)
+ self._logger.exception(
+ "Error serializing schedule %r – " "removing from data store",
+ schedule.id,
+ )
requests.append(DeleteOne(filters))
finished_schedule_ids.append(schedule.id)
continue
update = {
- '$unset': {
- 'acquired_by': True,
- 'acquired_until': True,
+ "$unset": {
+ "acquired_by": True,
+ "acquired_until": True,
+ },
+ "$set": {
+ "trigger": serialized_trigger,
+ "next_fire_time": schedule.next_fire_time,
},
- '$set': {
- 'trigger': serialized_trigger,
- 'next_fire_time': schedule.next_fire_time
- }
}
requests.append(UpdateOne(filters, update))
updated_schedules.append((schedule.id, schedule.next_fire_time))
@@ -301,11 +358,14 @@ class MongoDBDataStore(DataStore):
if requests:
for attempt in self._retrying:
with attempt, self.client.start_session() as session:
- self._schedules.bulk_write(requests, ordered=False, session=session)
+ self._schedules.bulk_write(
+ requests, ordered=False, session=session
+ )
for schedule_id, next_fire_time in updated_schedules:
- event = ScheduleUpdated(schedule_id=schedule_id,
- next_fire_time=next_fire_time)
+ event = ScheduleUpdated(
+ schedule_id=schedule_id, next_fire_time=next_fire_time
+ )
self._events.publish(event)
for schedule_id in finished_schedule_ids:
@@ -314,38 +374,46 @@ class MongoDBDataStore(DataStore):
def get_next_schedule_run_time(self) -> datetime | None:
for attempt in self._retrying:
with attempt:
- document = self._schedules.find_one({'next_run_time': {'$ne': None}},
- projection=['next_run_time'],
- sort=[('next_run_time', ASCENDING)])
+ document = self._schedules.find_one(
+ {"next_run_time": {"$ne": None}},
+ projection=["next_run_time"],
+ sort=[("next_run_time", ASCENDING)],
+ )
if document:
- return document['next_run_time']
+ return document["next_run_time"]
else:
return None
def add_job(self, job: Job) -> None:
document = job.marshal(self.serializer)
- document['_id'] = document.pop('id')
+ document["_id"] = document.pop("id")
for attempt in self._retrying:
with attempt:
self._jobs.insert_one(document)
- event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- tags=job.tags)
+ event = JobAdded(
+ job_id=job.id,
+ task_id=job.task_id,
+ schedule_id=job.schedule_id,
+ tags=job.tags,
+ )
self._events.publish(event)
def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
- filters = {'_id': {'$in': list(ids)}} if ids is not None else {}
+ filters = {"_id": {"$in": list(ids)}} if ids is not None else {}
for attempt in self._retrying:
with attempt:
jobs: list[Job] = []
- cursor = self._jobs.find(filters).sort('_id')
+ cursor = self._jobs.find(filters).sort("_id")
for document in cursor:
- document['id'] = document.pop('_id')
+ document["id"] = document.pop("_id")
try:
job = Job.unmarshal(self.serializer, document)
except DeserializationError:
- self._logger.warning('Failed to deserialize job %r', document['id'])
+ self._logger.warning(
+ "Failed to deserialize job %r", document["id"]
+ )
continue
jobs.append(job)
@@ -356,30 +424,35 @@ class MongoDBDataStore(DataStore):
for attempt in self._retrying:
with attempt, self.client.start_session() as session:
cursor = self._jobs.find(
- {'$or': [{'acquired_until': {'$exists': False}},
- {'acquired_until': {'$lt': datetime.now(timezone.utc)}}]
- },
- sort=[('created_at', ASCENDING)],
+ {
+ "$or": [
+ {"acquired_until": {"$exists": False}},
+ {"acquired_until": {"$lt": datetime.now(timezone.utc)}},
+ ]
+ },
+ sort=[("created_at", ASCENDING)],
limit=limit,
- session=session
+ session=session,
)
documents = list(cursor)
# Retrieve the limits
- task_ids: set[str] = {document['task_id'] for document in documents}
+ task_ids: set[str] = {document["task_id"] for document in documents}
task_limits = self._tasks.find(
- {'_id': {'$in': list(task_ids)}, 'max_running_jobs': {'$ne': None}},
- projection=['max_running_jobs', 'running_jobs'],
- session=session
+ {"_id": {"$in": list(task_ids)}, "max_running_jobs": {"$ne": None}},
+ projection=["max_running_jobs", "running_jobs"],
+ session=session,
)
- job_slots_left = {doc['_id']: doc['max_running_jobs'] - doc['running_jobs']
- for doc in task_limits}
+ job_slots_left = {
+ doc["_id"]: doc["max_running_jobs"] - doc["running_jobs"]
+ for doc in task_limits
+ }
# Filter out jobs that don't have free slots
acquired_jobs: list[Job] = []
increments: dict[str, int] = defaultdict(lambda: 0)
for document in documents:
- document['id'] = document.pop('_id')
+ document["id"] = document.pop("_id")
job = Job.unmarshal(self.serializer, document)
# Don't acquire the job if there are no free slots left
@@ -395,18 +468,23 @@ class MongoDBDataStore(DataStore):
if acquired_jobs:
now = datetime.now(timezone.utc)
acquired_until = datetime.fromtimestamp(
- now.timestamp() + self.lock_expiration_delay, timezone.utc)
- filters = {'_id': {'$in': [job.id for job in acquired_jobs]}}
- update = {'$set': {'acquired_by': worker_id,
- 'acquired_until': acquired_until}}
+ now.timestamp() + self.lock_expiration_delay, timezone.utc
+ )
+ filters = {"_id": {"$in": [job.id for job in acquired_jobs]}}
+ update = {
+ "$set": {
+ "acquired_by": worker_id,
+ "acquired_until": acquired_until,
+ }
+ }
self._jobs.update_many(filters, update, session=session)
# Increment the running job counters on each task
for task_id, increment in increments.items():
self._tasks.find_one_and_update(
- {'_id': task_id},
- {'$inc': {'running_jobs': increment}},
- session=session
+ {"_id": task_id},
+ {"$inc": {"running_jobs": increment}},
+ session=session,
)
# Publish the appropriate events
@@ -420,31 +498,31 @@ class MongoDBDataStore(DataStore):
with attempt, self.client.start_session() as session:
# Insert the job result
document = result.marshal(self.serializer)
- document['_id'] = document.pop('job_id')
+ document["_id"] = document.pop("job_id")
self._jobs_results.insert_one(document, session=session)
# Decrement the running jobs counter
self._tasks.find_one_and_update(
- {'_id': task_id},
- {'$inc': {'running_jobs': -1}},
- session=session
+ {"_id": task_id}, {"$inc": {"running_jobs": -1}}, session=session
)
# Delete the job
- self._jobs.delete_one({'_id': result.job_id}, session=session)
+ self._jobs.delete_one({"_id": result.job_id}, session=session)
# Publish the event
self._events.publish(
- JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ JobReleased(
+ job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
+ )
)
def get_job_result(self, job_id: UUID) -> JobResult | None:
for attempt in self._retrying:
with attempt:
- document = self._jobs_results.find_one_and_delete({'_id': job_id})
+ document = self._jobs_results.find_one_and_delete({"_id": job_id})
if document:
- document['job_id'] = document.pop('_id')
+ document["job_id"] = document.pop("_id")
return JobResult.unmarshal(self.serializer, document)
else:
return None
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index ee9fd34..f15bcaf 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -9,8 +9,22 @@ from uuid import UUID
import attrs
import tenacity
from sqlalchemy import (
- JSON, TIMESTAMP, BigInteger, Column, Enum, Integer, LargeBinary, MetaData, Table,
- TypeDecorator, Unicode, and_, bindparam, or_, select)
+ JSON,
+ TIMESTAMP,
+ BigInteger,
+ Column,
+ Enum,
+ Integer,
+ LargeBinary,
+ MetaData,
+ Table,
+ TypeDecorator,
+ Unicode,
+ and_,
+ bindparam,
+ or_,
+ select,
+)
from sqlalchemy.engine import URL, Dialect, Result
from sqlalchemy.exc import CompileError, IntegrityError, OperationalError
from sqlalchemy.future import Engine, create_engine
@@ -21,9 +35,19 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- Event, JobAcquired, JobAdded, JobDeserializationFailed, JobReleased, ScheduleAdded,
- ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
- TaskUpdated)
+ Event,
+ JobAcquired,
+ JobAdded,
+ JobDeserializationFailed,
+ JobReleased,
+ ScheduleAdded,
+ ScheduleDeserializationFailed,
+ ScheduleRemoved,
+ ScheduleUpdated,
+ TaskAdded,
+ TaskRemoved,
+ TaskUpdated,
+)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
@@ -79,11 +103,11 @@ class _BaseSQLAlchemyDataStore:
def __attrs_post_init__(self) -> None:
# Generate the table definitions
self._metadata = self.get_table_definitions()
- self.t_metadata = self._metadata.tables['metadata']
- self.t_tasks = self._metadata.tables['tasks']
- self.t_schedules = self._metadata.tables['schedules']
- self.t_jobs = self._metadata.tables['jobs']
- self.t_job_results = self._metadata.tables['job_results']
+ self.t_metadata = self._metadata.tables["metadata"]
+ self.t_tasks = self._metadata.tables["tasks"]
+ self.t_schedules = self._metadata.tables["schedules"]
+ self.t_jobs = self._metadata.tables["jobs"]
+ self.t_job_results = self._metadata.tables["job_results"]
# Find out if the dialect supports UPDATE...RETURNING
update = self.t_jobs.update().returning(self.t_jobs.c.id)
@@ -95,11 +119,14 @@ class _BaseSQLAlchemyDataStore:
self._supports_update_returning = True
def _after_attempt(self, retry_state: tenacity.RetryCallState) -> None:
- self._logger.warning('Temporary data store error (attempt %d): %s',
- retry_state.attempt_number, retry_state.outcome.exception())
+ self._logger.warning(
+ "Temporary data store error (attempt %d): %s",
+ retry_state.attempt_number,
+ retry_state.outcome.exception(),
+ )
def get_table_definitions(self) -> MetaData:
- if self.engine.dialect.name == 'postgresql':
+ if self.engine.dialect.name == "postgresql":
from sqlalchemy.dialects import postgresql
timestamp_type = TIMESTAMP(timezone=True)
@@ -111,63 +138,59 @@ class _BaseSQLAlchemyDataStore:
interval_type = EmulatedInterval
metadata = MetaData()
+ Table("metadata", metadata, Column("schema_version", Integer, nullable=False))
Table(
- 'metadata',
- metadata,
- Column('schema_version', Integer, nullable=False)
- )
- Table(
- 'tasks',
+ "tasks",
metadata,
- Column('id', Unicode(500), primary_key=True),
- Column('func', Unicode(500), nullable=False),
- Column('state', LargeBinary),
- Column('max_running_jobs', Integer),
- Column('misfire_grace_time', interval_type),
- Column('running_jobs', Integer, nullable=False, server_default=literal(0))
+ Column("id", Unicode(500), primary_key=True),
+ Column("func", Unicode(500), nullable=False),
+ Column("state", LargeBinary),
+ Column("max_running_jobs", Integer),
+ Column("misfire_grace_time", interval_type),
+ Column("running_jobs", Integer, nullable=False, server_default=literal(0)),
)
Table(
- 'schedules',
+ "schedules",
metadata,
- Column('id', Unicode(500), primary_key=True),
- Column('task_id', Unicode(500), nullable=False, index=True),
- Column('trigger', LargeBinary),
- Column('args', LargeBinary),
- Column('kwargs', LargeBinary),
- Column('coalesce', Enum(CoalescePolicy), nullable=False),
- Column('misfire_grace_time', interval_type),
- Column('max_jitter', interval_type),
- Column('tags', JSON, nullable=False),
- Column('next_fire_time', timestamp_type, index=True),
- Column('last_fire_time', timestamp_type),
- Column('acquired_by', Unicode(500)),
- Column('acquired_until', timestamp_type)
+ Column("id", Unicode(500), primary_key=True),
+ Column("task_id", Unicode(500), nullable=False, index=True),
+ Column("trigger", LargeBinary),
+ Column("args", LargeBinary),
+ Column("kwargs", LargeBinary),
+ Column("coalesce", Enum(CoalescePolicy), nullable=False),
+ Column("misfire_grace_time", interval_type),
+ Column("max_jitter", interval_type),
+ Column("tags", JSON, nullable=False),
+ Column("next_fire_time", timestamp_type, index=True),
+ Column("last_fire_time", timestamp_type),
+ Column("acquired_by", Unicode(500)),
+ Column("acquired_until", timestamp_type),
)
Table(
- 'jobs',
+ "jobs",
metadata,
- Column('id', job_id_type, primary_key=True),
- Column('task_id', Unicode(500), nullable=False, index=True),
- Column('args', LargeBinary, nullable=False),
- Column('kwargs', LargeBinary, nullable=False),
- Column('schedule_id', Unicode(500)),
- Column('scheduled_fire_time', timestamp_type),
- Column('jitter', interval_type),
- Column('start_deadline', timestamp_type),
- Column('tags', JSON, nullable=False),
- Column('created_at', timestamp_type, nullable=False),
- Column('started_at', timestamp_type),
- Column('acquired_by', Unicode(500)),
- Column('acquired_until', timestamp_type)
+ Column("id", job_id_type, primary_key=True),
+ Column("task_id", Unicode(500), nullable=False, index=True),
+ Column("args", LargeBinary, nullable=False),
+ Column("kwargs", LargeBinary, nullable=False),
+ Column("schedule_id", Unicode(500)),
+ Column("scheduled_fire_time", timestamp_type),
+ Column("jitter", interval_type),
+ Column("start_deadline", timestamp_type),
+ Column("tags", JSON, nullable=False),
+ Column("created_at", timestamp_type, nullable=False),
+ Column("started_at", timestamp_type),
+ Column("acquired_by", Unicode(500)),
+ Column("acquired_until", timestamp_type),
)
Table(
- 'job_results',
+ "job_results",
metadata,
- Column('job_id', job_id_type, primary_key=True),
- Column('outcome', Enum(JobOutcome), nullable=False),
- Column('finished_at', timestamp_type, index=True),
- Column('exception', LargeBinary),
- Column('return_value', LargeBinary)
+ Column("job_id", job_id_type, primary_key=True),
+ Column("outcome", Enum(JobOutcome), nullable=False),
+ Column("finished_at", timestamp_type, index=True),
+ Column("exception", LargeBinary),
+ Column("return_value", LargeBinary),
)
return metadata
@@ -178,7 +201,8 @@ class _BaseSQLAlchemyDataStore:
schedules.append(Schedule.unmarshal(self.serializer, row._asdict()))
except SerializationError as exc:
self._events.publish(
- ScheduleDeserializationFailed(schedule_id=row['id'], exception=exc))
+ ScheduleDeserializationFailed(schedule_id=row["id"], exception=exc)
+ )
return schedules
@@ -189,7 +213,8 @@ class _BaseSQLAlchemyDataStore:
jobs.append(Job.unmarshal(self.serializer, row._asdict()))
except SerializationError as exc:
self._events.publish(
- JobDeserializationFailed(job_id=row['id'], exception=exc))
+ JobDeserializationFailed(job_id=row["id"], exception=exc)
+ )
return jobs
@@ -212,9 +237,12 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
# Construct the Tenacity retry controller
self._retrying = tenacity.Retrying(
- stop=self.retry_settings.stop, wait=self.retry_settings.wait,
- retry=tenacity.retry_if_exception_type(OperationalError), after=self._after_attempt,
- reraise=True)
+ stop=self.retry_settings.stop,
+ wait=self.retry_settings.wait,
+ retry=tenacity.retry_if_exception_type(OperationalError),
+ after=self._after_attempt,
+ reraise=True,
+ )
def __enter__(self):
for attempt in self._retrying:
@@ -228,11 +256,12 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
result = conn.execute(query)
version = result.scalar()
if version is None:
- conn.execute(self.t_metadata.insert(values={'schema_version': 1}))
+ conn.execute(self.t_metadata.insert(values={"schema_version": 1}))
elif version > 1:
raise RuntimeError(
- f'Unexpected schema version ({version}); '
- f'only version 1 is supported by this version of APScheduler')
+ f"Unexpected schema version ({version}); "
+ f"only version 1 is supported by this version of APScheduler"
+ )
self._events.__enter__()
return self
@@ -245,19 +274,26 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
return self._events
def add_task(self, task: Task) -> None:
- insert = self.t_tasks.insert().\
- values(id=task.id, func=callable_to_ref(task.func),
- max_running_jobs=task.max_running_jobs,
- misfire_grace_time=task.misfire_grace_time)
+ insert = self.t_tasks.insert().values(
+ id=task.id,
+ func=callable_to_ref(task.func),
+ max_running_jobs=task.max_running_jobs,
+ misfire_grace_time=task.misfire_grace_time,
+ )
try:
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
conn.execute(insert)
except IntegrityError:
- update = self.t_tasks.update().\
- values(func=callable_to_ref(task.func), max_running_jobs=task.max_running_jobs,
- misfire_grace_time=task.misfire_grace_time).\
- where(self.t_tasks.c.id == task.id)
+ update = (
+ self.t_tasks.update()
+ .values(
+ func=callable_to_ref(task.func),
+ max_running_jobs=task.max_running_jobs,
+ misfire_grace_time=task.misfire_grace_time,
+ )
+ .where(self.t_tasks.c.id == task.id)
+ )
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
conn.execute(update)
@@ -276,9 +312,15 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
self._events.publish(TaskRemoved(task_id=task_id))
def get_task(self, task_id: str) -> Task:
- query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\
- where(self.t_tasks.c.id == task_id)
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
+ ]
+ ).where(self.t_tasks.c.id == task_id)
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
result = conn.execute(query)
@@ -290,13 +332,21 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
raise TaskLookupError
def get_tasks(self) -> list[Task]:
- query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\
- order_by(self.t_tasks.c.id)
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
+ ]
+ ).order_by(self.t_tasks.c.id)
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
result = conn.execute(query)
- tasks = [Task.unmarshal(self.serializer, row._asdict()) for row in result]
+ tasks = [
+ Task.unmarshal(self.serializer, row._asdict()) for row in result
+ ]
return tasks
def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
@@ -311,20 +361,24 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
if conflict_policy is ConflictPolicy.exception:
raise ConflictingIdError(schedule.id) from None
elif conflict_policy is ConflictPolicy.replace:
- del values['id']
- update = self.t_schedules.update().\
- where(self.t_schedules.c.id == schedule.id).\
- values(**values)
+ del values["id"]
+ update = (
+ self.t_schedules.update()
+ .where(self.t_schedules.c.id == schedule.id)
+ .values(**values)
+ )
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
conn.execute(update)
- event = ScheduleUpdated(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleUpdated(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
self._events.publish(event)
else:
- event = ScheduleAdded(schedule_id=schedule.id,
- next_fire_time=schedule.next_fire_time)
+ event = ScheduleAdded(
+ schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
+ )
self._events.publish(event)
def remove_schedules(self, ids: Iterable[str]) -> None:
@@ -333,7 +387,9 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
if self._supports_update_returning:
delete = delete.returning(self.t_schedules.c.id)
- removed_ids: Iterable[str] = [row[0] for row in conn.execute(delete)]
+ removed_ids: Iterable[str] = [
+ row[0] for row in conn.execute(delete)
+ ]
else:
# TODO: actually check which rows were deleted?
conn.execute(delete)
@@ -357,24 +413,37 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
with attempt, self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
- schedules_cte = select(self.t_schedules.c.id).\
- where(and_(self.t_schedules.c.next_fire_time.isnot(None),
- self.t_schedules.c.next_fire_time <= now,
- or_(self.t_schedules.c.acquired_until.is_(None),
- self.t_schedules.c.acquired_until < now))).\
- order_by(self.t_schedules.c.next_fire_time).\
- limit(limit).with_for_update(skip_locked=True).cte()
+ schedules_cte = (
+ select(self.t_schedules.c.id)
+ .where(
+ and_(
+ self.t_schedules.c.next_fire_time.isnot(None),
+ self.t_schedules.c.next_fire_time <= now,
+ or_(
+ self.t_schedules.c.acquired_until.is_(None),
+ self.t_schedules.c.acquired_until < now,
+ ),
+ )
+ )
+ .order_by(self.t_schedules.c.next_fire_time)
+ .limit(limit)
+ .with_for_update(skip_locked=True)
+ .cte()
+ )
subselect = select([schedules_cte.c.id])
- update = self.t_schedules.update().\
- where(self.t_schedules.c.id.in_(subselect)).\
- values(acquired_by=scheduler_id, acquired_until=acquired_until)
+ update = (
+ self.t_schedules.update()
+ .where(self.t_schedules.c.id.in_(subselect))
+ .values(acquired_by=scheduler_id, acquired_until=acquired_until)
+ )
if self._supports_update_returning:
update = update.returning(*self.t_schedules.columns)
result = conn.execute(update)
else:
conn.execute(update)
- query = self.t_schedules.select().\
- where(and_(self.t_schedules.c.acquired_by == scheduler_id))
+ query = self.t_schedules.select().where(
+ and_(self.t_schedules.c.acquired_by == scheduler_id)
+ )
result = conn.execute(query)
schedules = self._deserialize_schedules(result)
@@ -390,49 +459,73 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
for schedule in schedules:
if schedule.next_fire_time is not None:
try:
- serialized_trigger = self.serializer.serialize(schedule.trigger)
+ serialized_trigger = self.serializer.serialize(
+ schedule.trigger
+ )
except SerializationError:
- self._logger.exception('Error serializing trigger for schedule %r – '
- 'removing from data store', schedule.id)
+ self._logger.exception(
+ "Error serializing trigger for schedule %r – "
+ "removing from data store",
+ schedule.id,
+ )
finished_schedule_ids.append(schedule.id)
continue
- update_args.append({
- 'p_id': schedule.id,
- 'p_trigger': serialized_trigger,
- 'p_next_fire_time': schedule.next_fire_time
- })
+ update_args.append(
+ {
+ "p_id": schedule.id,
+ "p_trigger": serialized_trigger,
+ "p_next_fire_time": schedule.next_fire_time,
+ }
+ )
else:
finished_schedule_ids.append(schedule.id)
# Update schedules that have a next fire time
if update_args:
- p_id: BindParameter = bindparam('p_id')
- p_trigger: BindParameter = bindparam('p_trigger')
- p_next_fire_time: BindParameter = bindparam('p_next_fire_time')
- update = self.t_schedules.update().\
- where(and_(self.t_schedules.c.id == p_id,
- self.t_schedules.c.acquired_by == scheduler_id)).\
- values(trigger=p_trigger, next_fire_time=p_next_fire_time,
- acquired_by=None, acquired_until=None)
- next_fire_times = {arg['p_id']: arg['p_next_fire_time'] for arg in update_args}
+ p_id: BindParameter = bindparam("p_id")
+ p_trigger: BindParameter = bindparam("p_trigger")
+ p_next_fire_time: BindParameter = bindparam("p_next_fire_time")
+ update = (
+ self.t_schedules.update()
+ .where(
+ and_(
+ self.t_schedules.c.id == p_id,
+ self.t_schedules.c.acquired_by == scheduler_id,
+ )
+ )
+ .values(
+ trigger=p_trigger,
+ next_fire_time=p_next_fire_time,
+ acquired_by=None,
+ acquired_until=None,
+ )
+ )
+ next_fire_times = {
+ arg["p_id"]: arg["p_next_fire_time"] for arg in update_args
+ }
if self._supports_update_returning:
update = update.returning(self.t_schedules.c.id)
- updated_ids = [row[0] for row in conn.execute(update, update_args)]
+ updated_ids = [
+ row[0] for row in conn.execute(update, update_args)
+ ]
else:
# TODO: actually check which rows were updated?
conn.execute(update, update_args)
updated_ids = list(next_fire_times)
for schedule_id in updated_ids:
- event = ScheduleUpdated(schedule_id=schedule_id,
- next_fire_time=next_fire_times[schedule_id])
+ event = ScheduleUpdated(
+ schedule_id=schedule_id,
+ next_fire_time=next_fire_times[schedule_id],
+ )
update_events.append(event)
# Remove schedules that have no next fire time or failed to serialize
if finished_schedule_ids:
- delete = self.t_schedules.delete().\
- where(self.t_schedules.c.id.in_(finished_schedule_ids))
+ delete = self.t_schedules.delete().where(
+ self.t_schedules.c.id.in_(finished_schedule_ids)
+ )
conn.execute(delete)
for event in update_events:
@@ -442,10 +535,12 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
self._events.publish(ScheduleRemoved(schedule_id=schedule_id))
def get_next_schedule_run_time(self) -> datetime | None:
- query = select(self.t_schedules.c.next_fire_time).\
- where(self.t_schedules.c.next_fire_time.isnot(None)).\
- order_by(self.t_schedules.c.next_fire_time).\
- limit(1)
+ query = (
+ select(self.t_schedules.c.next_fire_time)
+ .where(self.t_schedules.c.next_fire_time.isnot(None))
+ .order_by(self.t_schedules.c.next_fire_time)
+ .limit(1)
+ )
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
result = conn.execute(query)
@@ -458,8 +553,12 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
with attempt, self.engine.begin() as conn:
conn.execute(insert)
- event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- tags=job.tags)
+ event = JobAdded(
+ job_id=job.id,
+ task_id=job.task_id,
+ schedule_id=job.schedule_id,
+ tags=job.tags,
+ )
self._events.publish(event)
def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
@@ -478,13 +577,19 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
with attempt, self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
- query = self.t_jobs.select().\
- join(self.t_tasks, self.t_tasks.c.id == self.t_jobs.c.task_id).\
- where(or_(self.t_jobs.c.acquired_until.is_(None),
- self.t_jobs.c.acquired_until < now)).\
- order_by(self.t_jobs.c.created_at).\
- with_for_update(skip_locked=True).\
- limit(limit)
+ query = (
+ self.t_jobs.select()
+ .join(self.t_tasks, self.t_tasks.c.id == self.t_jobs.c.task_id)
+ .where(
+ or_(
+ self.t_jobs.c.acquired_until.is_(None),
+ self.t_jobs.c.acquired_until < now,
+ )
+ )
+ .order_by(self.t_jobs.c.created_at)
+ .with_for_update(skip_locked=True)
+ .limit(limit)
+ )
result = conn.execute(query)
if not result:
@@ -495,10 +600,15 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
task_ids: set[str] = {job.task_id for job in jobs}
# Retrieve the limits
- query = select([self.t_tasks.c.id,
- self.t_tasks.c.max_running_jobs - self.t_tasks.c.running_jobs]).\
- where(self.t_tasks.c.max_running_jobs.isnot(None),
- self.t_tasks.c.id.in_(task_ids))
+ query = select(
+ [
+ self.t_tasks.c.id,
+ self.t_tasks.c.max_running_jobs - self.t_tasks.c.running_jobs,
+ ]
+ ).where(
+ self.t_tasks.c.max_running_jobs.isnot(None),
+ self.t_tasks.c.id.in_(task_ids),
+ )
result = conn.execute(query)
job_slots_left = dict(result.fetchall())
@@ -519,19 +629,25 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
if acquired_jobs:
# Mark the acquired jobs as acquired by this worker
acquired_job_ids = [job.id for job in acquired_jobs]
- update = self.t_jobs.update().\
- values(acquired_by=worker_id, acquired_until=acquired_until).\
- where(self.t_jobs.c.id.in_(acquired_job_ids))
+ update = (
+ self.t_jobs.update()
+ .values(acquired_by=worker_id, acquired_until=acquired_until)
+ .where(self.t_jobs.c.id.in_(acquired_job_ids))
+ )
conn.execute(update)
# Increment the running job counters on each task
- p_id: BindParameter = bindparam('p_id')
- p_increment: BindParameter = bindparam('p_increment')
- params = [{'p_id': task_id, 'p_increment': increment}
- for task_id, increment in increments.items()]
- update = self.t_tasks.update().\
- values(running_jobs=self.t_tasks.c.running_jobs + p_increment).\
- where(self.t_tasks.c.id == p_id)
+ p_id: BindParameter = bindparam("p_id")
+ p_increment: BindParameter = bindparam("p_increment")
+ params = [
+ {"p_id": task_id, "p_increment": increment}
+ for task_id, increment in increments.items()
+ ]
+ update = (
+ self.t_tasks.update()
+ .values(running_jobs=self.t_tasks.c.running_jobs + p_increment)
+ .where(self.t_tasks.c.id == p_id)
+ )
conn.execute(update, params)
# Publish the appropriate events
@@ -549,9 +665,11 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
conn.execute(insert)
# Decrement the running jobs counter
- update = self.t_tasks.update().\
- values(running_jobs=self.t_tasks.c.running_jobs - 1).\
- where(self.t_tasks.c.id == task_id)
+ update = (
+ self.t_tasks.update()
+ .values(running_jobs=self.t_tasks.c.running_jobs - 1)
+ .where(self.t_tasks.c.id == task_id)
+ )
conn.execute(update)
# Delete the job
@@ -560,20 +678,26 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
# Publish the event
self._events.publish(
- JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ JobReleased(
+ job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
+ )
)
def get_job_result(self, job_id: UUID) -> JobResult | None:
for attempt in self._retrying:
with attempt, self.engine.begin() as conn:
# Retrieve the result
- query = self.t_job_results.select().\
- where(self.t_job_results.c.job_id == job_id)
+ query = self.t_job_results.select().where(
+ self.t_job_results.c.job_id == job_id
+ )
row = conn.execute(query).fetchone()
# Delete the result
- delete = self.t_job_results.delete().\
- where(self.t_job_results.c.job_id == job_id)
+ delete = self.t_job_results.delete().where(
+ self.t_job_results.c.job_id == job_id
+ )
conn.execute(delete)
- return JobResult.unmarshal(self.serializer, row._asdict()) if row else None
+ return (
+ JobResult.unmarshal(self.serializer, row._asdict()) if row else None
+ )
diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py
index 3f1e13f..ebceaad 100644
--- a/src/apscheduler/eventbrokers/async_adapter.py
+++ b/src/apscheduler/eventbrokers/async_adapter.py
@@ -26,7 +26,9 @@ class AsyncEventBrokerAdapter(LocalAsyncEventBroker):
self._exit_stack.enter_async_context(self.portal)
await to_thread.run_sync(self.original.__enter__)
- self._exit_stack.push_async_exit(partial(to_thread.run_sync, self.original.__exit__))
+ self._exit_stack.push_async_exit(
+ partial(to_thread.run_sync, self.original.__exit__)
+ )
# Relay events from the original broker to this one
self._exit_stack.enter_context(
diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py
index 753505c..98031bf 100644
--- a/src/apscheduler/eventbrokers/async_local.py
+++ b/src/apscheduler/eventbrokers/async_local.py
@@ -39,8 +39,13 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker):
event_type = type(event)
one_shot_tokens: list[object] = []
for _token, subscription in self._subscriptions.items():
- if subscription.event_types is None or event_type in subscription.event_types:
- self._task_group.start_soon(self._deliver_event, subscription.callback, event)
+ if (
+ subscription.event_types is None
+ or event_type in subscription.event_types
+ ):
+ self._task_group.start_soon(
+ self._deliver_event, subscription.callback, event
+ )
if subscription.one_shot:
one_shot_tokens.append(subscription.token)
@@ -53,4 +58,6 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker):
if iscoroutine(retval):
await retval
except BaseException:
- self._logger.exception('Error delivering %s event', event.__class__.__name__)
+ self._logger.exception(
+ "Error delivering %s event", event.__class__.__name__
+ )
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py
index ca58da9..cc485db 100644
--- a/src/apscheduler/eventbrokers/asyncpg.py
+++ b/src/apscheduler/eventbrokers/asyncpg.py
@@ -22,7 +22,7 @@ if TYPE_CHECKING:
@attrs.define(eq=False)
class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
connection_factory: Callable[[], AsyncContextManager[Connection]]
- channel: str = attrs.field(kw_only=True, default='apscheduler')
+ channel: str = attrs.field(kw_only=True, default="apscheduler")
max_idle_time: float = attrs.field(kw_only=True, default=30)
@classmethod
@@ -31,9 +31,11 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
@classmethod
def from_async_sqla_engine(cls, engine: AsyncEngine) -> AsyncpgEventBroker:
- if engine.dialect.driver != 'asyncpg':
- raise ValueError(f'The driver in the engine must be "asyncpg" (current: '
- f'{engine.dialect.driver})')
+ if engine.dialect.driver != "asyncpg":
+ raise ValueError(
+ f'The driver in the engine must be "asyncpg" (current: '
+ f"{engine.dialect.driver})"
+ )
@asynccontextmanager
async def connection_factory() -> AsyncGenerator[Connection, None]:
@@ -68,14 +70,16 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
try:
while True:
await sleep(self.max_idle_time)
- await conn.execute('SELECT 1')
+ await conn.execute("SELECT 1")
finally:
await conn.remove_listener(self.channel, callback)
async def publish(self, event: Event) -> None:
notification = self.generate_notification_str(event)
if len(notification) > 7999:
- raise SerializationError('Serialized event object exceeds 7999 bytes in size')
+ raise SerializationError(
+ "Serialized event object exceeds 7999 bytes in size"
+ )
async with self.connection_factory() as conn:
await conn.execute("SELECT pg_notify($1, $2)", self.channel, notification)
diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py
index c917d2f..38c83c7 100644
--- a/src/apscheduler/eventbrokers/base.py
+++ b/src/apscheduler/eventbrokers/base.py
@@ -27,14 +27,20 @@ class LocalSubscription(Subscription):
@attrs.define(eq=False)
class BaseEventBroker(EventBroker):
_logger: Logger = attrs.field(init=False)
- _subscriptions: dict[object, LocalSubscription] = attrs.field(init=False, factory=dict)
+ _subscriptions: dict[object, LocalSubscription] = attrs.field(
+ init=False, factory=dict
+ )
def __attrs_post_init__(self) -> None:
self._logger = getLogger(self.__class__.__module__)
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Iterable[type[Event]] | None = None, *,
- one_shot: bool = False) -> Subscription:
+ def subscribe(
+ self,
+ callback: Callable[[Event], Any],
+ event_types: Iterable[type[Event]] | None = None,
+ *,
+ one_shot: bool = False,
+ ) -> Subscription:
types = set(event_types) if event_types else None
token = object()
subscription = LocalSubscription(callback, types, one_shot, token, self)
@@ -51,48 +57,54 @@ class DistributedEventBrokerMixin:
def generate_notification(self, event: Event) -> bytes:
serialized = self.serializer.serialize(attrs.asdict(event))
- return event.__class__.__name__.encode('ascii') + b' ' + serialized
+ return event.__class__.__name__.encode("ascii") + b" " + serialized
def generate_notification_str(self, event: Event) -> str:
serialized = self.serializer.serialize(attrs.asdict(event))
- return event.__class__.__name__ + ' ' + b64encode(serialized).decode('ascii')
+ return event.__class__.__name__ + " " + b64encode(serialized).decode("ascii")
def _reconstitute_event(self, event_type: str, serialized: bytes) -> Event | None:
try:
kwargs = self.serializer.deserialize(serialized)
except DeserializationError:
- self._logger.exception('Failed to deserialize an event of type %s', event_type,
- serialized=serialized)
+ self._logger.exception(
+ "Failed to deserialize an event of type %s",
+ event_type,
+ serialized=serialized,
+ )
return None
try:
event_class = getattr(events, event_type)
except AttributeError:
- self._logger.error('Receive notification for a nonexistent event type: %s',
- event_type, serialized=serialized)
+ self._logger.error(
+ "Receive notification for a nonexistent event type: %s",
+ event_type,
+ serialized=serialized,
+ )
return None
try:
return event_class(**kwargs)
except Exception:
- self._logger.exception('Error reconstituting event of type %s', event_type)
+ self._logger.exception("Error reconstituting event of type %s", event_type)
return None
def reconstitute_event(self, payload: bytes) -> Event | None:
try:
- event_type_bytes, serialized = payload.split(b' ', 1)
+ event_type_bytes, serialized = payload.split(b" ", 1)
except ValueError:
- self._logger.error('Received malformatted notification', payload=payload)
+ self._logger.error("Received malformatted notification", payload=payload)
return None
- event_type = event_type_bytes.decode('ascii', errors='replace')
+ event_type = event_type_bytes.decode("ascii", errors="replace")
return self._reconstitute_event(event_type, serialized)
def reconstitute_event_str(self, payload: str) -> Event | None:
try:
- event_type, b64_serialized = payload.split(' ', 1)
+ event_type, b64_serialized = payload.split(" ", 1)
except ValueError:
- self._logger.error('Received malformatted notification', payload=payload)
+ self._logger.error("Received malformatted notification", payload=payload)
return None
return self._reconstitute_event(event_type, b64decode(b64_serialized))
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py
index f780aae..cf19526 100644
--- a/src/apscheduler/eventbrokers/local.py
+++ b/src/apscheduler/eventbrokers/local.py
@@ -30,12 +30,18 @@ class LocalEventBroker(BaseEventBroker):
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
del self._executor
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Iterable[type[Event]] | None = None, *,
- one_shot: bool = False) -> Subscription:
+ def subscribe(
+ self,
+ callback: Callable[[Event], Any],
+ event_types: Iterable[type[Event]] | None = None,
+ *,
+ one_shot: bool = False,
+ ) -> Subscription:
if iscoroutinefunction(callback):
- raise ValueError('Coroutine functions are not supported as callbacks on a synchronous '
- 'event source')
+ raise ValueError(
+ "Coroutine functions are not supported as callbacks on a synchronous "
+ "event source"
+ )
with self._subscriptions_lock:
return super().subscribe(callback, event_types, one_shot=one_shot)
@@ -52,8 +58,13 @@ class LocalEventBroker(BaseEventBroker):
with self._subscriptions_lock:
one_shot_tokens: list[object] = []
for _token, subscription in self._subscriptions.items():
- if subscription.event_types is None or event_type in subscription.event_types:
- self._executor.submit(self._deliver_event, subscription.callback, event)
+ if (
+ subscription.event_types is None
+ or event_type in subscription.event_types
+ ):
+ self._executor.submit(
+ self._deliver_event, subscription.callback, event
+ )
if subscription.one_shot:
one_shot_tokens.append(subscription.token)
@@ -64,4 +75,6 @@ class LocalEventBroker(BaseEventBroker):
try:
func(event)
except BaseException:
- self._logger.exception('Error delivering %s event', event.__class__.__name__)
+ self._logger.exception(
+ "Error delivering %s event", event.__class__.__name__
+ )
diff --git a/src/apscheduler/eventbrokers/mqtt.py b/src/apscheduler/eventbrokers/mqtt.py
index 04db95d..8b0a512 100644
--- a/src/apscheduler/eventbrokers/mqtt.py
+++ b/src/apscheduler/eventbrokers/mqtt.py
@@ -21,9 +21,9 @@ from .local import LocalEventBroker
class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
client: Client
serializer: Serializer = attrs.field(factory=JSONSerializer)
- host: str = attrs.field(kw_only=True, default='localhost')
+ host: str = attrs.field(kw_only=True, default="localhost")
port: int = attrs.field(kw_only=True, default=1883)
- topic: str = attrs.field(kw_only=True, default='apscheduler')
+ topic: str = attrs.field(kw_only=True, default="apscheduler")
subscribe_qos: int = attrs.field(kw_only=True, default=0)
publish_qos: int = attrs.field(kw_only=True, default=0)
_ready_future: Future[None] = attrs.field(init=False)
@@ -38,19 +38,29 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
self.client.connect(self.host, self.port)
self.client.loop_start()
self._ready_future.result(10)
- self._exit_stack.push(lambda exc_type, *_: self.client.loop_stop(force=bool(exc_type)))
+ self._exit_stack.push(
+ lambda exc_type, *_: self.client.loop_stop(force=bool(exc_type))
+ )
self._exit_stack.callback(self.client.disconnect)
return self
- def _on_connect(self, client: Client, userdata: Any, flags: dict[str, Any],
- rc: ReasonCodes | int, properties: Properties | None = None) -> None:
+ def _on_connect(
+ self,
+ client: Client,
+ userdata: Any,
+ flags: dict[str, Any],
+ rc: ReasonCodes | int,
+ properties: Properties | None = None,
+ ) -> None:
try:
client.subscribe(self.topic, qos=self.subscribe_qos)
except Exception as exc:
self._ready_future.set_exception(exc)
raise
- def _on_subscribe(self, client: Client, userdata: Any, mid, granted_qos: list[int]) -> None:
+ def _on_subscribe(
+ self, client: Client, userdata: Any, mid, granted_qos: list[int]
+ ) -> None:
self._ready_future.set_result(None)
def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None:
diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py
index 5211181..0f14e3e 100644
--- a/src/apscheduler/eventbrokers/redis.py
+++ b/src/apscheduler/eventbrokers/redis.py
@@ -19,7 +19,7 @@ from .local import LocalEventBroker
class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
client: Redis
serializer: Serializer = attrs.field(factory=JSONSerializer)
- channel: str = attrs.field(kw_only=True, default='apscheduler')
+ channel: str = attrs.field(kw_only=True, default="apscheduler")
message_poll_interval: float = attrs.field(kw_only=True, default=0.05)
_stopped: bool = attrs.field(init=False, default=True)
_ready_future: Future[None] = attrs.field(init=False)
@@ -33,7 +33,9 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
def __enter__(self):
self._stopped = False
self._ready_future = Future()
- self._thread = Thread(target=self._listen_messages, daemon=True, name='Redis subscriber')
+ self._thread = Thread(
+ target=self._listen_messages, daemon=True, name="Redis subscriber"
+ )
self._thread.start()
self._ready_future.result(10)
return super().__enter__()
@@ -62,12 +64,12 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
try:
while not self._stopped:
msg = pubsub.get_message(timeout=self.message_poll_interval)
- if msg and isinstance(msg['data'], bytes):
- event = self.reconstitute_event(msg['data'])
+ if msg and isinstance(msg["data"], bytes):
+ event = self.reconstitute_event(msg["data"])
if event is not None:
self.publish_local(event)
except BaseException:
- self._logger.exception('Subscriber crashed')
+ self._logger.exception("Subscriber crashed")
raise
finally:
pubsub.close()
diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py
index f38c12e..408089e 100644
--- a/src/apscheduler/events.py
+++ b/src/apscheduler/events.py
@@ -13,14 +13,16 @@ from .enums import JobOutcome
@attrs.define(kw_only=True, frozen=True)
class Event:
- timestamp: datetime = attrs.field(factory=partial(datetime.now, timezone.utc),
- converter=as_aware_datetime)
+ timestamp: datetime = attrs.field(
+ factory=partial(datetime.now, timezone.utc), converter=as_aware_datetime
+ )
#
# Data store events
#
+
@attrs.define(kw_only=True, frozen=True)
class DataStoreEvent(Event):
pass
@@ -87,6 +89,7 @@ class JobDeserializationFailed(DataStoreEvent):
# Scheduler events
#
+
@attrs.define(kw_only=True, frozen=True)
class SchedulerEvent(Event):
pass
@@ -106,6 +109,7 @@ class SchedulerStopped(SchedulerEvent):
# Worker events
#
+
@attrs.define(kw_only=True, frozen=True)
class WorkerEvent(Event):
pass
diff --git a/src/apscheduler/exceptions.py b/src/apscheduler/exceptions.py
index 64fbf6a..21af6cd 100644
--- a/src/apscheduler/exceptions.py
+++ b/src/apscheduler/exceptions.py
@@ -7,21 +7,21 @@ class TaskLookupError(LookupError):
"""Raised by a data store when it cannot find the requested task."""
def __init__(self, task_id: str):
- super().__init__(f'No task by the id of {task_id!r} was found')
+ super().__init__(f"No task by the id of {task_id!r} was found")
class JobLookupError(KeyError):
"""Raised when the job store cannot find a job for update or removal."""
def __init__(self, job_id: UUID):
- super().__init__(f'No job by the id of {job_id} was found')
+ super().__init__(f"No job by the id of {job_id} was found")
class JobResultNotReady(KeyError):
"""Raised by ``get_job_result()`` if the job result is not ready."""
def __init__(self, job_id: UUID):
- super().__init__(f'No job by the id of {job_id} was found')
+ super().__init__(f"No job by the id of {job_id} was found")
class JobCancelled(Exception):
@@ -40,7 +40,8 @@ class ConflictingIdError(KeyError):
def __init__(self, schedule_id):
super().__init__(
- f'This data store already contains a schedule with the identifier {schedule_id!r}')
+ f"This data store already contains a schedule with the identifier {schedule_id!r}"
+ )
class TransientJobError(ValueError):
@@ -51,8 +52,9 @@ class TransientJobError(ValueError):
def __init__(self, job_id):
super().__init__(
- f'Job ({job_id}) cannot be added to this job store because a reference to the '
- f'callable could not be determined.')
+ f"Job ({job_id}) cannot be added to this job store because a reference to the "
+ f"callable could not be determined."
+ )
class SerializationError(Exception):
@@ -74,11 +76,11 @@ class SchedulerAlreadyRunningError(Exception):
"""Raised when attempting to start or configure the scheduler when it's already running."""
def __str__(self):
- return 'Scheduler is already running'
+ return "Scheduler is already running"
class SchedulerNotRunningError(Exception):
"""Raised when attempting to shutdown the scheduler when it's not running."""
def __str__(self):
- return 'Scheduler is not running'
+ return "Scheduler is not running"
diff --git a/src/apscheduler/marshalling.py b/src/apscheduler/marshalling.py
index 00a9277..63d4994 100644
--- a/src/apscheduler/marshalling.py
+++ b/src/apscheduler/marshalling.py
@@ -14,7 +14,10 @@ else:
def marshal_object(obj) -> tuple[str, Any]:
- return f'{obj.__class__.__module__}:{obj.__class__.__qualname__}', obj.__getstate__()
+ return (
+ f"{obj.__class__.__module__}:{obj.__class__.__qualname__}",
+ obj.__getstate__(),
+ )
def unmarshal_object(ref: str, state):
@@ -60,12 +63,13 @@ def unmarshal_date(value):
def marshal_timezone(value: tzinfo) -> str:
if isinstance(value, ZoneInfo):
return value.key
- elif hasattr(value, 'zone'): # pytz timezones
+ elif hasattr(value, "zone"): # pytz timezones
return value.zone
raise SerializationError(
- f'Unserializable time zone: {value!r}\n'
- f'Only time zones from the zoneinfo or pytz modules can be serialized.')
+ f"Unserializable time zone: {value!r}\n"
+ f"Only time zones from the zoneinfo or pytz modules can be serialized."
+ )
def unmarshal_timezone(value: str) -> ZoneInfo:
@@ -81,18 +85,18 @@ def callable_to_ref(func: Callable) -> str:
"""
if isinstance(func, partial):
- raise SerializationError('Cannot create a reference to a partial()')
+ raise SerializationError("Cannot create a reference to a partial()")
- if not hasattr(func, '__module__'):
- raise SerializationError('Callable has no __module__ attribute')
- if not hasattr(func, '__qualname__'):
- raise SerializationError('Callable has no __qualname__ attribute')
- if '<lambda>' in func.__qualname__:
- raise SerializationError('Cannot create a reference to a lambda')
- if '<locals>' in func.__qualname__:
- raise SerializationError('Cannot create a reference to a nested function')
+ if not hasattr(func, "__module__"):
+ raise SerializationError("Callable has no __module__ attribute")
+ if not hasattr(func, "__qualname__"):
+ raise SerializationError("Callable has no __qualname__ attribute")
+ if "<lambda>" in func.__qualname__:
+ raise SerializationError("Cannot create a reference to a lambda")
+ if "<locals>" in func.__qualname__:
+ raise SerializationError("Cannot create a reference to a nested function")
- return f'{func.__module__}:{func.__qualname__}'
+ return f"{func.__module__}:{func.__qualname__}"
def callable_from_ref(ref: str) -> Callable:
@@ -103,23 +107,27 @@ def callable_from_ref(ref: str) -> Callable:
not callable
"""
- if ':' not in ref:
- raise ValueError(f'Invalid reference: {ref}')
+ if ":" not in ref:
+ raise ValueError(f"Invalid reference: {ref}")
- modulename, rest = ref.split(':', 1)
+ modulename, rest = ref.split(":", 1)
try:
obj = __import__(modulename, fromlist=[rest])
except ImportError:
- raise LookupError(f'Error resolving reference {ref!r}: could not import module')
+ raise LookupError(f"Error resolving reference {ref!r}: could not import module")
try:
- for name in rest.split('.'):
+ for name in rest.split("."):
obj = getattr(obj, name)
except Exception:
- raise DeserializationError(f'Error resolving reference {ref!r}: error looking up object')
+ raise DeserializationError(
+ f"Error resolving reference {ref!r}: error looking up object"
+ )
if not callable(obj):
- raise DeserializationError(f'{ref!r} points to an object of type '
- f'{obj.__class__.__qualname__} which is not callable')
+ raise DeserializationError(
+ f"{ref!r} points to an object of type "
+ f"{obj.__class__.__qualname__} which is not callable"
+ )
return obj
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 2f6c418..72c5994 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -11,7 +11,12 @@ from uuid import UUID, uuid4
import anyio
import attrs
-from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after
+from anyio import (
+ TASK_STATUS_IGNORED,
+ create_task_group,
+ get_cancelled_exc_class,
+ move_on_after,
+)
from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger
from ..context import current_scheduler
@@ -20,7 +25,13 @@ from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+ Event,
+ JobReleased,
+ ScheduleAdded,
+ SchedulerStarted,
+ SchedulerStopped,
+ ScheduleUpdated,
+)
from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -34,7 +45,9 @@ _zero_timedelta = timedelta()
class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
- data_store: AsyncDataStore = attrs.field(converter=as_async_datastore, factory=MemoryDataStore)
+ data_store: AsyncDataStore = attrs.field(
+ converter=as_async_datastore, factory=MemoryDataStore
+ )
identity: str = attrs.field(kw_only=True, default=None)
start_worker: bool = attrs.field(kw_only=True, default=True)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
@@ -43,12 +56,14 @@ class AsyncScheduler:
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_worker: AsyncWorker | None = attrs.field(init=False, default=None)
- _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker)
+ _events: LocalAsyncEventBroker = attrs.field(
+ init=False, factory=LocalAsyncEventBroker
+ )
_exit_stack: AsyncExitStack = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -67,7 +82,9 @@ class AsyncScheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
await self._exit_stack.enter_async_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
@@ -99,22 +116,28 @@ class AsyncScheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast('ScheduleAdded | ScheduleUpdated', event)
- if (
- not self._wakeup_deadline
- or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ event_ = cast("ScheduleAdded | ScheduleUpdated", event)
+ if not self._wakeup_deadline or (
+ event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
):
- self.logger.debug('Detected a %s event – waking up the scheduler',
- type(event).__name__)
+ self.logger.debug(
+ "Detected a %s event – waking up the scheduler", type(event).__name__
+ )
self._wakeup_event.set()
async def add_schedule(
- self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
- args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None,
+ self,
+ func_or_task_id: str | Callable,
+ trigger: Trigger,
+ *,
+ id: str | None = None,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
- max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None,
- conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
+ max_jitter: float | timedelta | None = None,
+ tags: Iterable[str] | None = None,
+ conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
id = id or str(uuid4())
args = tuple(args or ())
@@ -129,13 +152,25 @@ class AsyncScheduler:
else:
task = await self.data_store.get_task(func_or_task_id)
- schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time,
- max_jitter=max_jitter, tags=tags)
+ schedule = Schedule(
+ id=id,
+ task_id=task.id,
+ trigger=trigger,
+ args=args,
+ kwargs=kwargs,
+ coalesce=coalesce,
+ misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter,
+ tags=tags,
+ )
schedule.next_fire_time = trigger.next()
await self.data_store.add_schedule(schedule, conflict_policy)
- self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
- trigger, schedule.next_fire_time)
+ self.logger.info(
+ "Added new schedule (task=%r, trigger=%r); next run time at %s",
+ task,
+ trigger,
+ schedule.next_fire_time,
+ )
return schedule.id
async def get_schedule(self, id: str) -> Schedule:
@@ -146,8 +181,12 @@ class AsyncScheduler:
await self.data_store.remove_schedules({schedule_id})
async def add_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = None,
) -> UUID:
"""
Add a job to the data store.
@@ -165,7 +204,12 @@ class AsyncScheduler:
else:
task = await self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
+ job = Job(
+ task_id=task.id,
+ args=args or (),
+ kwargs=kwargs or {},
+ tags=tags or frozenset(),
+ )
await self.data_store.add_job(job)
return job.id
@@ -199,8 +243,12 @@ class AsyncScheduler:
return result
async def run_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = ()
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = (),
) -> Any:
"""
Convenience method to add a job and then return its result (or raise its exception).
@@ -216,7 +264,9 @@ class AsyncScheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = await self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_id = await self.add_job(
+ func_or_task_id, args=args, kwargs=kwargs, tags=tags
+ )
await job_complete_event.wait()
result = await self.get_job_result(job_id)
@@ -229,12 +279,14 @@ class AsyncScheduler:
elif result.outcome is JobOutcome.cancelled:
raise JobCancelled
else:
- raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+ raise RuntimeError(f"Unknown job outcome: {result.outcome}")
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the scheduler is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the scheduler is in the "
+ f"{self._state} state"
+ )
# Signal that the scheduler has started
self._state = RunState.started
@@ -255,8 +307,11 @@ class AsyncScheduler:
fire_time = calculate_next()
except Exception:
self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, schedule.task_id)
+ "Error computing next fire time for schedule %r of task %r – "
+ "removing schedule",
+ schedule.id,
+ schedule.task_id,
+ )
break
# Stop if the calculated fire time is in the future
@@ -271,7 +326,11 @@ class AsyncScheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
+ )
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
@@ -284,19 +343,30 @@ class AsyncScheduler:
if next_fire_time is not None:
# Jitter must never be so high that it would cause a fire time to
# equal or exceed the next fire time
- jitter_s = min([
- max_jitter,
- (next_fire_time - fire_time
- - _microsecond_delta).total_seconds()
- ])
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
jitter = timedelta(seconds=random.uniform(0, jitter_s))
fire_time += jitter
schedule.last_fire_time = fire_time
- job = Job(task_id=schedule.task_id, args=schedule.args,
- kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time, jitter=jitter,
- start_deadline=schedule.next_deadline, tags=schedule.tags)
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
await self.data_store.add_job(job)
# Update the schedules (and release the scheduler's claim on them)
@@ -306,29 +376,34 @@ class AsyncScheduler:
# schedule is due or the scheduler is explicitly woken up
wait_time = None
if len(schedules) < 100:
- self._wakeup_deadline = await self.data_store.get_next_schedule_run_time()
+ self._wakeup_deadline = (
+ await self.data_store.get_next_schedule_run_time()
+ )
if self._wakeup_deadline:
wait_time = (
self._wakeup_deadline - datetime.now(timezone.utc)
).total_seconds()
- self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, self._wakeup_deadline)
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ self._wakeup_deadline,
+ )
else:
- self.logger.debug('Waiting for any due schedules to appear')
+ self.logger.debug("Waiting for any due schedules to appear")
with move_on_after(wait_time):
await self._wakeup_event.wait()
self._wakeup_event = anyio.Event()
else:
- self.logger.debug('Processing more schedules on the next iteration')
+ self.logger.debug("Processing more schedules on the next iteration")
except get_cancelled_exc_class():
pass
except BaseException as exc:
- self.logger.exception('Scheduler crashed')
+ self.logger.exception("Scheduler crashed")
exception = exc
raise
else:
- self.logger.info('Scheduler stopped')
+ self.logger.info("Scheduler stopped")
finally:
self._state = RunState.stopped
with move_on_after(3, shield=True):
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 3e9f196..c78d93c 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -19,7 +19,13 @@ from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+ Event,
+ JobReleased,
+ ScheduleAdded,
+ SchedulerStarted,
+ SchedulerStopped,
+ ScheduleUpdated,
+)
from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
from ..structures import Job, JobResult, Schedule, Task
@@ -47,7 +53,7 @@ class Scheduler:
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -70,7 +76,9 @@ class Scheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
self._exit_stack.enter_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
@@ -92,7 +100,9 @@ class Scheduler:
start_future: Future[Event] = Future()
with self._events.subscribe(start_future.set_result, one_shot=True):
executor = ThreadPoolExecutor(1)
- self._exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None))
+ self._exit_stack.push(
+ lambda exc_type, *args: executor.shutdown(wait=exc_type is None)
+ )
run_future = executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
@@ -109,22 +119,28 @@ class Scheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast('ScheduleAdded | ScheduleUpdated', event)
- if (
- not self._wakeup_deadline
- or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ event_ = cast("ScheduleAdded | ScheduleUpdated", event)
+ if not self._wakeup_deadline or (
+ event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
):
- self.logger.debug('Detected a %s event – waking up the scheduler',
- type(event).__name__)
+ self.logger.debug(
+ "Detected a %s event – waking up the scheduler", type(event).__name__
+ )
self._wakeup_event.set()
def add_schedule(
- self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
- args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None,
+ self,
+ func_or_task_id: str | Callable,
+ trigger: Trigger,
+ *,
+ id: str | None = None,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
- max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None,
- conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
+ max_jitter: float | timedelta | None = None,
+ tags: Iterable[str] | None = None,
+ conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
id = id or str(uuid4())
args = tuple(args or ())
@@ -139,13 +155,25 @@ class Scheduler:
else:
task = self.data_store.get_task(func_or_task_id)
- schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time,
- max_jitter=max_jitter, tags=tags)
+ schedule = Schedule(
+ id=id,
+ task_id=task.id,
+ trigger=trigger,
+ args=args,
+ kwargs=kwargs,
+ coalesce=coalesce,
+ misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter,
+ tags=tags,
+ )
schedule.next_fire_time = trigger.next()
self.data_store.add_schedule(schedule, conflict_policy)
- self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
- trigger, schedule.next_fire_time)
+ self.logger.info(
+ "Added new schedule (task=%r, trigger=%r); next run time at %s",
+ task,
+ trigger,
+ schedule.next_fire_time,
+ )
return schedule.id
def get_schedule(self, id: str) -> Schedule:
@@ -156,8 +184,12 @@ class Scheduler:
self.data_store.remove_schedules({schedule_id})
def add_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = None,
) -> UUID:
"""
Add a job to the data store.
@@ -175,7 +207,12 @@ class Scheduler:
else:
task = self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
+ job = Job(
+ task_id=task.id,
+ args=args or (),
+ kwargs=kwargs or {},
+ tags=tags or frozenset(),
+ )
self.data_store.add_job(job)
return job.id
@@ -209,8 +246,12 @@ class Scheduler:
return result
def run_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = ()
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = (),
) -> Any:
"""
Convenience method to add a job and then return its result (or raise its exception).
@@ -239,12 +280,14 @@ class Scheduler:
elif result.outcome is JobOutcome.cancelled:
raise JobCancelled
else:
- raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+ raise RuntimeError(f"Unknown job outcome: {result.outcome}")
def run(self) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the scheduler is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the scheduler is in the "
+ f"{self._state} state"
+ )
# Signal that the scheduler has started
self._state = RunState.started
@@ -253,8 +296,10 @@ class Scheduler:
try:
while self._state is RunState.started:
schedules = self.data_store.acquire_schedules(self.identity, 100)
- self.logger.debug('Processing %d schedules retrieved from the data store',
- len(schedules))
+ self.logger.debug(
+ "Processing %d schedules retrieved from the data store",
+ len(schedules),
+ )
now = datetime.now(timezone.utc)
for schedule in schedules:
# Calculate a next fire time for the schedule, if possible
@@ -265,8 +310,11 @@ class Scheduler:
fire_time = calculate_next()
except Exception:
self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, schedule.task_id)
+ "Error computing next fire time for schedule %r of task %r – "
+ "removing schedule",
+ schedule.id,
+ schedule.task_id,
+ )
break
# Stop if the calculated fire time is in the future
@@ -281,7 +329,11 @@ class Scheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
+ )
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
@@ -294,19 +346,30 @@ class Scheduler:
if next_fire_time is not None:
# Jitter must never be so high that it would cause a fire time to
# equal or exceed the next fire time
- jitter_s = min([
- max_jitter,
- (next_fire_time - fire_time
- - _microsecond_delta).total_seconds()
- ])
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
jitter = timedelta(seconds=random.uniform(0, jitter_s))
fire_time += jitter
schedule.last_fire_time = fire_time
- job = Job(task_id=schedule.task_id, args=schedule.args,
- kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time, jitter=jitter,
- start_deadline=schedule.next_deadline, tags=schedule.tags)
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
self.data_store.add_job(job)
# Update the schedules (and release the scheduler's claim on them)
@@ -321,23 +384,26 @@ class Scheduler:
wait_time = (
self._wakeup_deadline - datetime.now(timezone.utc)
).total_seconds()
- self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, self._wakeup_deadline)
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ self._wakeup_deadline,
+ )
else:
- self.logger.debug('Waiting for any due schedules to appear')
+ self.logger.debug("Waiting for any due schedules to appear")
if self._wakeup_event.wait(wait_time):
self._wakeup_event = threading.Event()
else:
- self.logger.debug('Processing more schedules on the next iteration')
+ self.logger.debug("Processing more schedules on the next iteration")
except BaseException as exc:
self._state = RunState.stopped
- self.logger.exception('Scheduler crashed')
+ self.logger.exception("Scheduler crashed")
self._events.publish(SchedulerStopped(exception=exc))
raise
self._state = RunState.stopped
- self.logger.info('Scheduler stopped')
+ self.logger.info("Scheduler stopped")
self._events.publish(SchedulerStopped())
# def stop(self) -> None:
diff --git a/src/apscheduler/serializers/cbor.py b/src/apscheduler/serializers/cbor.py
index bb610c5..9791b77 100644
--- a/src/apscheduler/serializers/cbor.py
+++ b/src/apscheduler/serializers/cbor.py
@@ -16,15 +16,17 @@ class CBORSerializer(Serializer):
load_options: dict[str, Any] = attrs.field(factory=dict)
def __attrs_post_init__(self):
- self.dump_options.setdefault('default', self._default_hook)
- self.load_options.setdefault('tag_hook', self._tag_hook)
+ self.dump_options.setdefault("default", self._default_hook)
+ self.load_options.setdefault("tag_hook", self._tag_hook)
def _default_hook(self, encoder, value):
- if hasattr(value, '__getstate__'):
+ if hasattr(value, "__getstate__"):
marshalled = marshal_object(value)
encoder.encode(CBORTag(self.type_tag, marshalled))
else:
- raise CBOREncodeTypeError(f'cannot serialize type {value.__class__.__name__}')
+ raise CBOREncodeTypeError(
+ f"cannot serialize type {value.__class__.__name__}"
+ )
def _tag_hook(self, decoder, tag: CBORTag, shareable_index: int = None):
if tag.tag == self.type_tag:
diff --git a/src/apscheduler/serializers/json.py b/src/apscheduler/serializers/json.py
index 02ad12f..255c520 100644
--- a/src/apscheduler/serializers/json.py
+++ b/src/apscheduler/serializers/json.py
@@ -12,22 +12,24 @@ from ..marshalling import marshal_date, marshal_object, unmarshal_object
@attrs.define(kw_only=True, eq=False)
class JSONSerializer(Serializer):
- magic_key: str = '_apscheduler_json'
+ magic_key: str = "_apscheduler_json"
dump_options: dict[str, Any] = attrs.field(factory=dict)
load_options: dict[str, Any] = attrs.field(factory=dict)
def __attrs_post_init__(self):
- self.dump_options['default'] = self._default_hook
- self.load_options['object_hook'] = self._object_hook
+ self.dump_options["default"] = self._default_hook
+ self.load_options["object_hook"] = self._object_hook
def _default_hook(self, obj):
- if hasattr(obj, '__getstate__'):
+ if hasattr(obj, "__getstate__"):
cls_ref, state = marshal_object(obj)
return {self.magic_key: [cls_ref, state]}
elif isinstance(obj, datetime):
return marshal_date(obj)
- raise TypeError(f'Object of type {obj.__class__.__name__!r} is not JSON serializable')
+ raise TypeError(
+ f"Object of type {obj.__class__.__name__!r} is not JSON serializable"
+ )
def _object_hook(self, obj_state: dict[str, Any]):
if self.magic_key in obj_state:
@@ -37,7 +39,7 @@ class JSONSerializer(Serializer):
return obj_state
def serialize(self, obj) -> bytes:
- return dumps(obj, ensure_ascii=False, **self.dump_options).encode('utf-8')
+ return dumps(obj, ensure_ascii=False, **self.dump_options).encode("utf-8")
def deserialize(self, serialized: bytes):
return loads(serialized, **self.load_options)
diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py
index dd8204c..8d1ba89 100644
--- a/src/apscheduler/structures.py
+++ b/src/apscheduler/structures.py
@@ -28,20 +28,22 @@ class Task:
id: str
func: Callable = attrs.field(eq=False, order=False)
max_running_jobs: int | None = attrs.field(eq=False, order=False, default=None)
- misfire_grace_time: timedelta | None = attrs.field(eq=False, order=False, default=None)
+ misfire_grace_time: timedelta | None = attrs.field(
+ eq=False, order=False, default=None
+ )
state: Any = None
def marshal(self, serializer: abc.Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
- marshalled['func'] = callable_to_ref(self.func)
- marshalled['state'] = serializer.serialize(self.state) if self.state else None
+ marshalled["func"] = callable_to_ref(self.func)
+ marshalled["state"] = serializer.serialize(self.state) if self.state else None
return marshalled
@classmethod
def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> Task:
- marshalled['func'] = callable_from_ref(marshalled['func'])
- if marshalled['state'] is not None:
- marshalled['state'] = serializer.deserialize(marshalled['state'])
+ marshalled["func"] = callable_from_ref(marshalled["func"])
+ if marshalled["state"] is not None:
+ marshalled["state"] = serializer.deserialize(marshalled["state"])
return cls(**marshalled)
@@ -52,14 +54,24 @@ class Schedule:
task_id: str = attrs.field(eq=False, order=False)
trigger: abc.Trigger = attrs.field(eq=False, order=False)
args: tuple = attrs.field(eq=False, order=False, converter=tuple, default=())
- kwargs: dict[str, Any] = attrs.field(eq=False, order=False, converter=dict, default=())
- coalesce: CoalescePolicy = attrs.field(eq=False, order=False, default=CoalescePolicy.latest,
- converter=as_enum(CoalescePolicy))
- misfire_grace_time: timedelta | None = attrs.field(eq=False, order=False, default=None,
- converter=as_timedelta)
- max_jitter: timedelta | None = attrs.field(eq=False, order=False, converter=as_timedelta,
- default=None)
- tags: frozenset[str] = attrs.field(eq=False, order=False, converter=frozenset, default=())
+ kwargs: dict[str, Any] = attrs.field(
+ eq=False, order=False, converter=dict, default=()
+ )
+ coalesce: CoalescePolicy = attrs.field(
+ eq=False,
+ order=False,
+ default=CoalescePolicy.latest,
+ converter=as_enum(CoalescePolicy),
+ )
+ misfire_grace_time: timedelta | None = attrs.field(
+ eq=False, order=False, default=None, converter=as_timedelta
+ )
+ max_jitter: timedelta | None = attrs.field(
+ eq=False, order=False, converter=as_timedelta, default=None
+ )
+ tags: frozenset[str] = attrs.field(
+ eq=False, order=False, converter=frozenset, default=()
+ )
next_fire_time: datetime | None = attrs.field(eq=False, order=False, default=None)
last_fire_time: datetime | None = attrs.field(eq=False, order=False, default=None)
acquired_by: str | None = attrs.field(eq=False, order=False, default=None)
@@ -67,20 +79,22 @@ class Schedule:
def marshal(self, serializer: abc.Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
- marshalled['trigger'] = serializer.serialize(self.trigger)
- marshalled['args'] = serializer.serialize(self.args)
- marshalled['kwargs'] = serializer.serialize(self.kwargs)
+ marshalled["trigger"] = serializer.serialize(self.trigger)
+ marshalled["args"] = serializer.serialize(self.args)
+ marshalled["kwargs"] = serializer.serialize(self.kwargs)
if not self.acquired_by:
- del marshalled['acquired_by']
- del marshalled['acquired_until']
+ del marshalled["acquired_by"]
+ del marshalled["acquired_until"]
return marshalled
@classmethod
- def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> Schedule:
- marshalled['trigger'] = serializer.deserialize(marshalled['trigger'])
- marshalled['args'] = serializer.deserialize(marshalled['args'])
- marshalled['kwargs'] = serializer.deserialize(marshalled['kwargs'])
+ def unmarshal(
+ cls, serializer: abc.Serializer, marshalled: dict[str, Any]
+ ) -> Schedule:
+ marshalled["trigger"] = serializer.deserialize(marshalled["trigger"])
+ marshalled["args"] = serializer.deserialize(marshalled["args"])
+ marshalled["kwargs"] = serializer.deserialize(marshalled["kwargs"])
return cls(**marshalled)
@property
@@ -96,15 +110,23 @@ class Job:
id: UUID = attrs.field(factory=uuid4)
task_id: str = attrs.field(eq=False, order=False)
args: tuple = attrs.field(eq=False, order=False, converter=tuple, default=())
- kwargs: dict[str, Any] = attrs.field(eq=False, order=False, converter=dict, default=())
+ kwargs: dict[str, Any] = attrs.field(
+ eq=False, order=False, converter=dict, default=()
+ )
schedule_id: str | None = attrs.field(eq=False, order=False, default=None)
- scheduled_fire_time: datetime | None = attrs.field(eq=False, order=False, default=None)
- jitter: timedelta = attrs.field(eq=False, order=False, converter=as_timedelta,
- factory=timedelta)
+ scheduled_fire_time: datetime | None = attrs.field(
+ eq=False, order=False, default=None
+ )
+ jitter: timedelta = attrs.field(
+ eq=False, order=False, converter=as_timedelta, factory=timedelta
+ )
start_deadline: datetime | None = attrs.field(eq=False, order=False, default=None)
- tags: frozenset[str] = attrs.field(eq=False, order=False, converter=frozenset, default=())
- created_at: datetime = attrs.field(eq=False, order=False,
- factory=partial(datetime.now, timezone.utc))
+ tags: frozenset[str] = attrs.field(
+ eq=False, order=False, converter=frozenset, default=()
+ )
+ created_at: datetime = attrs.field(
+ eq=False, order=False, factory=partial(datetime.now, timezone.utc)
+ )
started_at: datetime | None = attrs.field(eq=False, order=False, default=None)
acquired_by: str | None = attrs.field(eq=False, order=False, default=None)
acquired_until: datetime | None = attrs.field(eq=False, order=False, default=None)
@@ -119,18 +141,18 @@ class Job:
def marshal(self, serializer: abc.Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
- marshalled['args'] = serializer.serialize(self.args)
- marshalled['kwargs'] = serializer.serialize(self.kwargs)
+ marshalled["args"] = serializer.serialize(self.args)
+ marshalled["kwargs"] = serializer.serialize(self.kwargs)
if not self.acquired_by:
- del marshalled['acquired_by']
- del marshalled['acquired_until']
+ del marshalled["acquired_by"]
+ del marshalled["acquired_until"]
return marshalled
@classmethod
def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> Job:
- marshalled['args'] = serializer.deserialize(marshalled['args'])
- marshalled['kwargs'] = serializer.deserialize(marshalled['kwargs'])
+ marshalled["args"] = serializer.deserialize(marshalled["args"])
+ marshalled["kwargs"] = serializer.deserialize(marshalled["kwargs"])
return cls(**marshalled)
@@ -146,50 +168,67 @@ class JobInfo:
@classmethod
def from_job(cls, job: Job) -> JobInfo:
- return cls(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- scheduled_fire_time=job.scheduled_fire_time, jitter=job.jitter,
- start_deadline=job.start_deadline, tags=job.tags)
+ return cls(
+ job_id=job.id,
+ task_id=job.task_id,
+ schedule_id=job.schedule_id,
+ scheduled_fire_time=job.scheduled_fire_time,
+ jitter=job.jitter,
+ start_deadline=job.start_deadline,
+ tags=job.tags,
+ )
@attrs.define(kw_only=True, frozen=True)
class JobResult:
job_id: UUID
- outcome: JobOutcome = attrs.field(eq=False, order=False, converter=as_enum(JobOutcome))
- finished_at: datetime = attrs.field(eq=False, order=False,
- factory=partial(datetime.now, timezone.utc))
+ outcome: JobOutcome = attrs.field(
+ eq=False, order=False, converter=as_enum(JobOutcome)
+ )
+ finished_at: datetime = attrs.field(
+ eq=False, order=False, factory=partial(datetime.now, timezone.utc)
+ )
exception: BaseException | None = attrs.field(eq=False, order=False, default=None)
return_value: Any = attrs.field(eq=False, order=False, default=None)
def marshal(self, serializer: abc.Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
if self.outcome is JobOutcome.error:
- marshalled['exception'] = serializer.serialize(self.exception)
+ marshalled["exception"] = serializer.serialize(self.exception)
else:
- del marshalled['exception']
+ del marshalled["exception"]
if self.outcome is JobOutcome.success:
- marshalled['return_value'] = serializer.serialize(self.return_value)
+ marshalled["return_value"] = serializer.serialize(self.return_value)
else:
- del marshalled['return_value']
+ del marshalled["return_value"]
return marshalled
@classmethod
- def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> JobResult:
- if marshalled.get('exception'):
- marshalled['exception'] = serializer.deserialize(marshalled['exception'])
- elif marshalled.get('return_value'):
- marshalled['return_value'] = serializer.deserialize(marshalled['return_value'])
+ def unmarshal(
+ cls, serializer: abc.Serializer, marshalled: dict[str, Any]
+ ) -> JobResult:
+ if marshalled.get("exception"):
+ marshalled["exception"] = serializer.deserialize(marshalled["exception"])
+ elif marshalled.get("return_value"):
+ marshalled["return_value"] = serializer.deserialize(
+ marshalled["return_value"]
+ )
return cls(**marshalled)
@attrs.define(kw_only=True, frozen=True)
class RetrySettings:
- stop: tenacity.stop.stop_base = attrs.field(validator=instance_of(tenacity.stop.stop_base),
- default=tenacity.stop_after_delay(60))
- wait: tenacity.wait.wait_base = attrs.field(validator=instance_of(tenacity.wait.wait_base),
- default=tenacity.wait_exponential(min=0.5, max=20))
+ stop: tenacity.stop.stop_base = attrs.field(
+ validator=instance_of(tenacity.stop.stop_base),
+ default=tenacity.stop_after_delay(60),
+ )
+ wait: tenacity.wait.wait_base = attrs.field(
+ validator=instance_of(tenacity.wait.wait_base),
+ default=tenacity.wait_exponential(min=0.5, max=20),
+ )
@classmethod
def fail_immediately(cls) -> RetrySettings:
diff --git a/src/apscheduler/triggers/calendarinterval.py b/src/apscheduler/triggers/calendarinterval.py
index fbb5896..0aedd15 100644
--- a/src/apscheduler/triggers/calendarinterval.py
+++ b/src/apscheduler/triggers/calendarinterval.py
@@ -6,7 +6,12 @@ from typing import Any
import attrs
from ..abc import Trigger
-from ..marshalling import marshal_date, marshal_timezone, unmarshal_date, unmarshal_timezone
+from ..marshalling import (
+ marshal_date,
+ marshal_timezone,
+ unmarshal_date,
+ unmarshal_timezone,
+)
from ..util import timezone_repr
from ..validators import as_date, as_timezone, require_state_version
@@ -65,7 +70,7 @@ class CalendarIntervalTrigger(Trigger):
second: int = 0
start_date: date = attrs.field(converter=as_date, factory=date.today)
end_date: date | None = attrs.field(converter=as_date, default=None)
- timezone: tzinfo = attrs.field(converter=as_timezone, default='local')
+ timezone: tzinfo = attrs.field(converter=as_timezone, default="local")
_time: time = attrs.field(init=False, eq=False)
_last_fire_date: date | None = attrs.field(init=False, eq=False, default=None)
@@ -73,10 +78,10 @@ class CalendarIntervalTrigger(Trigger):
self._time = time(self.hour, self.minute, self.second, tzinfo=self.timezone)
if self.years == self.months == self.weeks == self.days == 0:
- raise ValueError('interval must be at least 1 day long')
+ raise ValueError("interval must be at least 1 day long")
if self.start_date and self.end_date and self.start_date > self.end_date:
- raise ValueError('end_date cannot be earlier than start_date')
+ raise ValueError("end_date cannot be earlier than start_date")
def next(self) -> datetime | None:
previous_date: date = self._last_fire_date
@@ -114,35 +119,35 @@ class CalendarIntervalTrigger(Trigger):
def __getstate__(self) -> dict[str, Any]:
return {
- 'version': 1,
- 'interval': [self.years, self.months, self.weeks, self.days],
- 'time': [self._time.hour, self._time.minute, self._time.second],
- 'start_date': marshal_date(self.start_date),
- 'end_date': marshal_date(self.end_date),
- 'timezone': marshal_timezone(self.timezone),
- 'last_fire_date': marshal_date(self._last_fire_date)
+ "version": 1,
+ "interval": [self.years, self.months, self.weeks, self.days],
+ "time": [self._time.hour, self._time.minute, self._time.second],
+ "start_date": marshal_date(self.start_date),
+ "end_date": marshal_date(self.end_date),
+ "timezone": marshal_timezone(self.timezone),
+ "last_fire_date": marshal_date(self._last_fire_date),
}
def __setstate__(self, state: dict[str, Any]) -> None:
require_state_version(self, state, 1)
- self.years, self.months, self.weeks, self.days = state['interval']
- self.start_date = unmarshal_date(state['start_date'])
- self.end_date = unmarshal_date(state['end_date'])
- self.timezone = unmarshal_timezone(state['timezone'])
- self._time = time(*state['time'], tzinfo=self.timezone)
- self._last_fire_date = unmarshal_date(state['last_fire_date'])
+ self.years, self.months, self.weeks, self.days = state["interval"]
+ self.start_date = unmarshal_date(state["start_date"])
+ self.end_date = unmarshal_date(state["end_date"])
+ self.timezone = unmarshal_timezone(state["timezone"])
+ self._time = time(*state["time"], tzinfo=self.timezone)
+ self._last_fire_date = unmarshal_date(state["last_fire_date"])
def __repr__(self) -> str:
fields = []
- for field in 'years', 'months', 'weeks', 'days':
+ for field in "years", "months", "weeks", "days":
value = getattr(self, field)
if value > 0:
- fields.append(f'{field}={value}')
+ fields.append(f"{field}={value}")
- fields.append(f'time={self._time.isoformat()!r}')
+ fields.append(f"time={self._time.isoformat()!r}")
fields.append(f"start_date='{self.start_date}'")
if self.end_date:
fields.append(f"end_date='{self.end_date}'")
- fields.append(f'timezone={timezone_repr(self.timezone)!r}')
+ fields.append(f"timezone={timezone_repr(self.timezone)!r}")
return f'{self.__class__.__name__}({", ".join(fields)})'
diff --git a/src/apscheduler/triggers/combining.py b/src/apscheduler/triggers/combining.py
index bcee64a..efe99c9 100644
--- a/src/apscheduler/triggers/combining.py
+++ b/src/apscheduler/triggers/combining.py
@@ -15,19 +15,23 @@ from ..validators import as_timedelta, require_state_version
@attrs.define
class BaseCombiningTrigger(Trigger):
triggers: list[Trigger]
- _next_fire_times: list[datetime | None] = attrs.field(init=False, eq=False, factory=list)
+ _next_fire_times: list[datetime | None] = attrs.field(
+ init=False, eq=False, factory=list
+ )
def __getstate__(self) -> dict[str, Any]:
return {
- 'version': 1,
- 'triggers': [marshal_object(trigger) for trigger in self.triggers],
- 'next_fire_times': self._next_fire_times
+ "version": 1,
+ "triggers": [marshal_object(trigger) for trigger in self.triggers],
+ "next_fire_times": self._next_fire_times,
}
@abstractmethod
def __setstate__(self, state: dict[str, Any]) -> None:
- self.triggers = [unmarshal_object(*trigger_state) for trigger_state in state['triggers']]
- self._next_fire_times = state['next_fire_times']
+ self.triggers = [
+ unmarshal_object(*trigger_state) for trigger_state in state["triggers"]
+ ]
+ self._next_fire_times = state["next_fire_times"]
@attrs.define
@@ -87,19 +91,21 @@ class AndTrigger(BaseCombiningTrigger):
def __getstate__(self) -> dict[str, Any]:
state = super().__getstate__()
- state['threshold'] = self.threshold.total_seconds()
- state['max_iterations'] = self.max_iterations
+ state["threshold"] = self.threshold.total_seconds()
+ state["max_iterations"] = self.max_iterations
return state
def __setstate__(self, state: dict[str, Any]) -> None:
require_state_version(self, state, 1)
super().__setstate__(state)
- self.threshold = timedelta(seconds=state['threshold'])
- self.max_iterations = state['max_iterations']
+ self.threshold = timedelta(seconds=state["threshold"])
+ self.max_iterations = state["max_iterations"]
def __repr__(self) -> str:
- return f'{self.__class__.__name__}({self.triggers}, ' \
- f'threshold={self.threshold.total_seconds()}, max_iterations={self.max_iterations})'
+ return (
+ f"{self.__class__.__name__}({self.triggers}, "
+ f"threshold={self.threshold.total_seconds()}, max_iterations={self.max_iterations})"
+ )
@attrs.define
@@ -120,8 +126,10 @@ class OrTrigger(BaseCombiningTrigger):
self._next_fire_times = [t.next() for t in self.triggers]
# Find out the earliest of the fire times
- earliest_time: datetime | None = min((fire_time for fire_time in self._next_fire_times
- if fire_time is not None), default=None)
+ earliest_time: datetime | None = min(
+ (fire_time for fire_time in self._next_fire_times if fire_time is not None),
+ default=None,
+ )
if earliest_time is not None:
# Generate new fire times for the trigger(s) that generated the earliest fire time
for i, fire_time in enumerate(self._next_fire_times):
@@ -135,4 +143,4 @@ class OrTrigger(BaseCombiningTrigger):
super().__setstate__(state)
def __repr__(self) -> str:
- return f'{self.__class__.__name__}({self.triggers})'
+ return f"{self.__class__.__name__}({self.triggers})"
diff --git a/src/apscheduler/triggers/cron/__init__.py b/src/apscheduler/triggers/cron/__init__.py
index df1f9e5..1bd2be1 100644
--- a/src/apscheduler/triggers/cron/__init__.py
+++ b/src/apscheduler/triggers/cron/__init__.py
@@ -7,11 +7,22 @@ import attrs
from tzlocal import get_localzone
from ...abc import Trigger
-from ...marshalling import marshal_date, marshal_timezone, unmarshal_date, unmarshal_timezone
+from ...marshalling import (
+ marshal_date,
+ marshal_timezone,
+ unmarshal_date,
+ unmarshal_timezone,
+)
from ...util import timezone_repr
from ...validators import as_aware_datetime, as_timezone, require_state_version
from .fields import (
- DEFAULT_VALUES, BaseField, DayOfMonthField, DayOfWeekField, MonthField, WeekField)
+ DEFAULT_VALUES,
+ BaseField,
+ DayOfMonthField,
+ DayOfWeekField,
+ MonthField,
+ WeekField,
+)
@attrs.define(kw_only=True)
@@ -37,14 +48,14 @@ class CronTrigger(Trigger):
"""
FIELDS_MAP: ClassVar[list[tuple[str, type[BaseField]]]] = [
- ('year', BaseField),
- ('month', MonthField),
- ('day', DayOfMonthField),
- ('week', WeekField),
- ('day_of_week', DayOfWeekField),
- ('hour', BaseField),
- ('minute', BaseField),
- ('second', BaseField)
+ ("year", BaseField),
+ ("month", MonthField),
+ ("day", DayOfMonthField),
+ ("week", WeekField),
+ ("day_of_week", DayOfWeekField),
+ ("hour", BaseField),
+ ("minute", BaseField),
+ ("second", BaseField),
]
year: int | str | None = None
@@ -55,32 +66,46 @@ class CronTrigger(Trigger):
hour: int | str | None = None
minute: int | str | None = None
second: int | str | None = None
- start_time: datetime = attrs.field(converter=as_aware_datetime, factory=datetime.now)
+ start_time: datetime = attrs.field(
+ converter=as_aware_datetime, factory=datetime.now
+ )
end_time: datetime | None = None
timezone: tzinfo | str = attrs.field(converter=as_timezone, factory=get_localzone)
_fields: list[BaseField] = attrs.field(init=False, eq=False, factory=list)
_last_fire_time: datetime | None = attrs.field(init=False, eq=False, default=None)
def __attrs_post_init__(self) -> None:
- self._set_fields([self.year, self.month, self.day, self.week, self.day_of_week, self.hour,
- self.minute, self.second])
+ self._set_fields(
+ [
+ self.year,
+ self.month,
+ self.day,
+ self.week,
+ self.day_of_week,
+ self.hour,
+ self.minute,
+ self.second,
+ ]
+ )
self._last_fire_time: datetime | None = None
def _set_fields(self, values: Sequence[int | str | None]) -> None:
self._fields = []
- assigned_values = {field_name: value
- for (field_name, _), value in zip(self.FIELDS_MAP, values)
- if value is not None}
+ assigned_values = {
+ field_name: value
+ for (field_name, _), value in zip(self.FIELDS_MAP, values)
+ if value is not None
+ }
for field_name, field_class in self.FIELDS_MAP:
exprs = assigned_values.pop(field_name, None)
if exprs is None:
- exprs = '*' if assigned_values else DEFAULT_VALUES[field_name]
+ exprs = "*" if assigned_values else DEFAULT_VALUES[field_name]
field = field_class(field_name, exprs)
self._fields.append(field)
@classmethod
- def from_crontab(cls, expr: str, timezone: str | tzinfo = 'local') -> CronTrigger:
+ def from_crontab(cls, expr: str, timezone: str | tzinfo = "local") -> CronTrigger:
"""
Create a :class:`~CronTrigger` from a standard crontab expression.
@@ -93,12 +118,20 @@ class CronTrigger(Trigger):
"""
values = expr.split()
if len(values) != 5:
- raise ValueError(f'Wrong number of fields; got {len(values)}, expected 5')
-
- return cls(minute=values[0], hour=values[1], day=values[2], month=values[3],
- day_of_week=values[4], timezone=timezone)
-
- def _increment_field_value(self, dateval: datetime, fieldnum: int) -> tuple[datetime, int]:
+ raise ValueError(f"Wrong number of fields; got {len(values)}, expected 5")
+
+ return cls(
+ minute=values[0],
+ hour=values[1],
+ day=values[2],
+ month=values[3],
+ day_of_week=values[4],
+ timezone=timezone,
+ )
+
+ def _increment_field_value(
+ self, dateval: datetime, fieldnum: int
+ ) -> tuple[datetime, int]:
"""
Increments the designated field and resets all less significant fields to their minimum
values.
@@ -136,11 +169,14 @@ class CronTrigger(Trigger):
i += 1
difference = datetime(**values) - dateval.replace(tzinfo=None)
- dateval = datetime.fromtimestamp(dateval.timestamp() + difference.total_seconds(),
- self.timezone)
+ dateval = datetime.fromtimestamp(
+ dateval.timestamp() + difference.total_seconds(), self.timezone
+ )
return dateval, fieldnum
- def _set_field_value(self, dateval: datetime, fieldnum: int, new_value: int) -> datetime:
+ def _set_field_value(
+ self, dateval: datetime, fieldnum: int, new_value: int
+ ) -> datetime:
values = {}
for i, field in enumerate(self._fields):
if field.real:
@@ -168,14 +204,18 @@ class CronTrigger(Trigger):
if next_value is None:
# No valid value was found
- next_time, fieldnum = self._increment_field_value(next_time, fieldnum - 1)
+ next_time, fieldnum = self._increment_field_value(
+ next_time, fieldnum - 1
+ )
elif next_value > curr_value:
# A valid, but higher than the starting value, was found
if field.real:
next_time = self._set_field_value(next_time, fieldnum, next_value)
fieldnum += 1
else:
- next_time, fieldnum = self._increment_field_value(next_time, fieldnum)
+ next_time, fieldnum = self._increment_field_value(
+ next_time, fieldnum
+ )
else:
# A valid value was found, no changes necessary
fieldnum += 1
@@ -190,29 +230,29 @@ class CronTrigger(Trigger):
def __getstate__(self) -> dict[str, Any]:
return {
- 'version': 1,
- 'timezone': marshal_timezone(self.timezone),
- 'fields': [str(f) for f in self._fields],
- 'start_time': marshal_date(self.start_time),
- 'end_time': marshal_date(self.end_time),
- 'last_fire_time': marshal_date(self._last_fire_time)
+ "version": 1,
+ "timezone": marshal_timezone(self.timezone),
+ "fields": [str(f) for f in self._fields],
+ "start_time": marshal_date(self.start_time),
+ "end_time": marshal_date(self.end_time),
+ "last_fire_time": marshal_date(self._last_fire_time),
}
def __setstate__(self, state: dict[str, Any]) -> None:
require_state_version(self, state, 1)
- self.timezone = unmarshal_timezone(state['timezone'])
- self.start_time = unmarshal_date(state['start_time'])
- self.end_time = unmarshal_date(state['end_time'])
- self._last_fire_time = unmarshal_date(state['last_fire_time'])
- self._set_fields(state['fields'])
+ self.timezone = unmarshal_timezone(state["timezone"])
+ self.start_time = unmarshal_date(state["start_time"])
+ self.end_time = unmarshal_date(state["end_time"])
+ self._last_fire_time = unmarshal_date(state["last_fire_time"])
+ self._set_fields(state["fields"])
def __repr__(self) -> str:
- fields = [f'{field.name}={str(field)!r}' for field in self._fields]
- fields.append(f'start_time={self.start_time.isoformat()!r}')
+ fields = [f"{field.name}={str(field)!r}" for field in self._fields]
+ fields.append(f"start_time={self.start_time.isoformat()!r}")
if self.end_time:
- fields.append(f'end_time={self.end_time.isoformat()!r}')
+ fields.append(f"end_time={self.end_time.isoformat()!r}")
- fields.append(f'timezone={timezone_repr(self.timezone)!r}')
+ fields.append(f"timezone={timezone_repr(self.timezone)!r}")
return f'CronTrigger({", ".join(fields)})'
diff --git a/src/apscheduler/triggers/cron/expressions.py b/src/apscheduler/triggers/cron/expressions.py
index 5c94afa..6d39a45 100644
--- a/src/apscheduler/triggers/cron/expressions.py
+++ b/src/apscheduler/triggers/cron/expressions.py
@@ -7,32 +7,47 @@ from datetime import datetime
from ...validators import as_int
-WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
-MONTHS = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
+WEEKDAYS = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
+MONTHS = [
+ "jan",
+ "feb",
+ "mar",
+ "apr",
+ "may",
+ "jun",
+ "jul",
+ "aug",
+ "sep",
+ "oct",
+ "nov",
+ "dec",
+]
def get_weekday_index(weekday: str) -> int:
try:
return WEEKDAYS.index(weekday.lower())
except ValueError:
- raise ValueError(f'Invalid weekday name {weekday!r}') from None
+ raise ValueError(f"Invalid weekday name {weekday!r}") from None
class AllExpression:
- __slots__ = 'step'
+ __slots__ = "step"
- value_re = re.compile(r'\*(?:/(?P<step>\d+))?$')
+ value_re = re.compile(r"\*(?:/(?P<step>\d+))?$")
def __init__(self, step: str | int | None = None):
self.step = as_int(step)
if self.step == 0:
- raise ValueError('Step must be higher than 0')
+ raise ValueError("Step must be higher than 0")
def validate_range(self, field_name: str, min_value: int, max_value: int) -> None:
value_range = max_value - min_value
if self.step and self.step > value_range:
- raise ValueError(f'the step value ({self.step}) is higher than the total range of the '
- f'expression ({value_range})')
+ raise ValueError(
+ f"the step value ({self.step}) is higher than the total range of the "
+ f"expression ({value_range})"
+ )
def get_next_value(self, dateval: datetime, field) -> int | None:
start = field.get_value(dateval)
@@ -49,16 +64,20 @@ class AllExpression:
return nextval if nextval <= maxval else None
def __str__(self):
- return f'*/{self.step}' if self.step else '*'
+ return f"*/{self.step}" if self.step else "*"
class RangeExpression(AllExpression):
- __slots__ = 'first', 'last'
+ __slots__ = "first", "last"
- value_re = re.compile(r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$')
+ value_re = re.compile(r"(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$")
- def __init__(self, first: str | int, last: str | int | None = None,
- step: str | int | None = None):
+ def __init__(
+ self,
+ first: str | int,
+ last: str | int | None = None,
+ step: str | int | None = None,
+ ):
super().__init__(step)
self.first = as_int(first)
self.last = as_int(last)
@@ -66,20 +85,28 @@ class RangeExpression(AllExpression):
if self.last is None and self.step is None:
self.last = self.first
if self.last is not None and self.first > self.last:
- raise ValueError('The minimum value in a range must not be higher than the maximum')
+ raise ValueError(
+ "The minimum value in a range must not be higher than the maximum"
+ )
def validate_range(self, field_name: str, min_value: int, max_value: int) -> None:
super().validate_range(field_name, min_value, max_value)
if self.first < min_value:
- raise ValueError(f'the first value ({self.first}) is lower than the minimum value '
- f'({min_value})')
+ raise ValueError(
+ f"the first value ({self.first}) is lower than the minimum value "
+ f"({min_value})"
+ )
if self.last is not None and self.last > max_value:
- raise ValueError(f'the last value ({self.last}) is higher than the maximum value '
- f'({max_value})')
+ raise ValueError(
+ f"the last value ({self.last}) is higher than the maximum value "
+ f"({max_value})"
+ )
value_range = (self.last or max_value) - self.first
if self.step and self.step > value_range:
- raise ValueError(f'the step value ({self.step}) is higher than the total range of the '
- f'expression ({value_range})')
+ raise ValueError(
+ f"the step value ({self.step}) is higher than the total range of the "
+ f"expression ({value_range})"
+ )
def get_next_value(self, date, field):
startval = field.get_value(date)
@@ -100,12 +127,12 @@ class RangeExpression(AllExpression):
def __str__(self):
if self.last != self.first and self.last is not None:
- rangeval = f'{self.first}-{self.last}'
+ rangeval = f"{self.first}-{self.last}"
else:
rangeval = str(self.first)
if self.step:
- return f'{rangeval}/{self.step}'
+ return f"{rangeval}/{self.step}"
return rangeval
@@ -113,19 +140,19 @@ class RangeExpression(AllExpression):
class MonthRangeExpression(RangeExpression):
__slots__ = ()
- value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
+ value_re = re.compile(r"(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?", re.IGNORECASE)
def __init__(self, first, last=None):
try:
first_num = MONTHS.index(first.lower()) + 1
except ValueError:
- raise ValueError(f'Invalid month name {first!r}') from None
+ raise ValueError(f"Invalid month name {first!r}") from None
if last:
try:
last_num = MONTHS.index(last.lower()) + 1
except ValueError:
- raise ValueError(f'Invalid month name {last!r}') from None
+ raise ValueError(f"Invalid month name {last!r}") from None
else:
last_num = None
@@ -133,7 +160,7 @@ class MonthRangeExpression(RangeExpression):
def __str__(self):
if self.last != self.first and self.last is not None:
- return f'{MONTHS[self.first - 1]}-{MONTHS[self.last - 1]}'
+ return f"{MONTHS[self.first - 1]}-{MONTHS[self.last - 1]}"
return MONTHS[self.first - 1]
@@ -141,7 +168,7 @@ class MonthRangeExpression(RangeExpression):
class WeekdayRangeExpression(RangeExpression):
__slots__ = ()
- value_re = re.compile(r'(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?', re.IGNORECASE)
+ value_re = re.compile(r"(?P<first>[a-z]+)(?:-(?P<last>[a-z]+))?", re.IGNORECASE)
def __init__(self, first: str, last: str | None = None):
first_num = get_weekday_index(first)
@@ -150,17 +177,19 @@ class WeekdayRangeExpression(RangeExpression):
def __str__(self):
if self.last != self.first and self.last is not None:
- return f'{WEEKDAYS[self.first]}-{WEEKDAYS[self.last]}'
+ return f"{WEEKDAYS[self.first]}-{WEEKDAYS[self.last]}"
return WEEKDAYS[self.first]
class WeekdayPositionExpression(AllExpression):
- __slots__ = 'option_num', 'weekday'
+ __slots__ = "option_num", "weekday"
- options = ['1st', '2nd', '3rd', '4th', '5th', 'last']
- value_re = re.compile(r'(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))' %
- '|'.join(options), re.IGNORECASE)
+ options = ["1st", "2nd", "3rd", "4th", "5th", "last"]
+ value_re = re.compile(
+ r"(?P<option_name>%s) +(?P<weekday_name>(?:\d+|\w+))" % "|".join(options),
+ re.IGNORECASE,
+ )
def __init__(self, option_name: str, weekday_name: str):
super().__init__(None)
@@ -168,7 +197,7 @@ class WeekdayPositionExpression(AllExpression):
try:
self.weekday = WEEKDAYS.index(weekday_name.lower())
except ValueError:
- raise ValueError(f'Invalid weekday name {weekday_name!r}') from None
+ raise ValueError(f"Invalid weekday name {weekday_name!r}") from None
def get_next_value(self, dateval: datetime, field) -> int | None:
# Figure out the weekday of the month's first day and the number of days in that month
@@ -191,13 +220,13 @@ class WeekdayPositionExpression(AllExpression):
return None
def __str__(self):
- return f'{self.options[self.option_num]} {WEEKDAYS[self.weekday]}'
+ return f"{self.options[self.option_num]} {WEEKDAYS[self.weekday]}"
class LastDayOfMonthExpression(AllExpression):
__slots__ = ()
- value_re = re.compile(r'last', re.IGNORECASE)
+ value_re = re.compile(r"last", re.IGNORECASE)
def __init__(self):
super().__init__(None)
@@ -206,4 +235,4 @@ class LastDayOfMonthExpression(AllExpression):
return monthrange(dateval.year, dateval.month)[1]
def __str__(self):
- return 'last'
+ return "last"
diff --git a/src/apscheduler/triggers/cron/fields.py b/src/apscheduler/triggers/cron/fields.py
index e68fdca..b70f643 100644
--- a/src/apscheduler/triggers/cron/fields.py
+++ b/src/apscheduler/triggers/cron/fields.py
@@ -7,20 +7,51 @@ from datetime import datetime
from typing import Any, ClassVar, Sequence
from .expressions import (
- WEEKDAYS, AllExpression, LastDayOfMonthExpression, MonthRangeExpression, RangeExpression,
- WeekdayPositionExpression, WeekdayRangeExpression, get_weekday_index)
-
-MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, 'day_of_week': 0, 'hour': 0,
- 'minute': 0, 'second': 0}
-MAX_VALUES = {'year': 9999, 'month': 12, 'day': 31, 'week': 53, 'day_of_week': 7, 'hour': 23,
- 'minute': 59, 'second': 59}
-DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', 'day_of_week': '*', 'hour': 0,
- 'minute': 0, 'second': 0}
-SEPARATOR = re.compile(' *, *')
+ WEEKDAYS,
+ AllExpression,
+ LastDayOfMonthExpression,
+ MonthRangeExpression,
+ RangeExpression,
+ WeekdayPositionExpression,
+ WeekdayRangeExpression,
+ get_weekday_index,
+)
+
+MIN_VALUES = {
+ "year": 1970,
+ "month": 1,
+ "day": 1,
+ "week": 1,
+ "day_of_week": 0,
+ "hour": 0,
+ "minute": 0,
+ "second": 0,
+}
+MAX_VALUES = {
+ "year": 9999,
+ "month": 12,
+ "day": 31,
+ "week": 53,
+ "day_of_week": 7,
+ "hour": 23,
+ "minute": 59,
+ "second": 59,
+}
+DEFAULT_VALUES = {
+ "year": "*",
+ "month": 1,
+ "day": 1,
+ "week": "*",
+ "day_of_week": "*",
+ "hour": 0,
+ "minute": 0,
+ "second": 0,
+}
+SEPARATOR = re.compile(" *, *")
class BaseField:
- __slots__ = 'name', 'expressions'
+ __slots__ = "name", "expressions"
real: ClassVar[bool] = True
compilers: ClassVar[Any] = (AllExpression, RangeExpression)
@@ -61,19 +92,22 @@ class BaseField:
compiled_expr = compiler(**match.groupdict())
try:
- compiled_expr.validate_range(self.name, MIN_VALUES[self.name],
- MAX_VALUES[self.name])
+ compiled_expr.validate_range(
+ self.name, MIN_VALUES[self.name], MAX_VALUES[self.name]
+ )
except ValueError as exc:
- raise ValueError(f'Error validating expression {expr!r}: {exc}') from exc
+ raise ValueError(
+ f"Error validating expression {expr!r}: {exc}"
+ ) from exc
self.expressions.append(compiled_expr)
return
- raise ValueError(f'Unrecognized expression {expr!r} for field {self.name!r}')
+ raise ValueError(f"Unrecognized expression {expr!r} for field {self.name!r}")
def __str__(self):
expr_strings = (str(e) for e in self.expressions)
- return ','.join(expr_strings)
+ return ",".join(expr_strings)
class WeekField(BaseField, real=False):
@@ -83,8 +117,9 @@ class WeekField(BaseField, real=False):
return dateval.isocalendar()[1]
-class DayOfMonthField(BaseField,
- extra_compilers=(WeekdayPositionExpression, LastDayOfMonthExpression)):
+class DayOfMonthField(
+ BaseField, extra_compilers=(WeekdayPositionExpression, LastDayOfMonthExpression)
+):
__slots__ = ()
def get_max(self, dateval: datetime) -> int:
@@ -107,7 +142,7 @@ class DayOfWeekField(BaseField, real=False, extra_compilers=(WeekdayRangeExpress
else:
last = first
- expr = f'{WEEKDAYS[first]}-{WEEKDAYS[last]}'
+ expr = f"{WEEKDAYS[first]}-{WEEKDAYS[last]}"
# For expressions like Sun-Tue or Sat-Mon, add two expressions that together cover the
# expected weekdays
@@ -117,8 +152,8 @@ class DayOfWeekField(BaseField, real=False, extra_compilers=(WeekdayRangeExpress
first_index = get_weekday_index(groups[0])
last_index = get_weekday_index(groups[1])
if first_index > last_index:
- super().append_expression(f'{WEEKDAYS[0]}-{groups[1]}')
- super().append_expression(f'{groups[0]}-{WEEKDAYS[-1]}')
+ super().append_expression(f"{WEEKDAYS[0]}-{groups[1]}")
+ super().append_expression(f"{groups[0]}-{WEEKDAYS[-1]}")
return
super().append_expression(expr)
diff --git a/src/apscheduler/triggers/date.py b/src/apscheduler/triggers/date.py
index 173c972..b55028a 100644
--- a/src/apscheduler/triggers/date.py
+++ b/src/apscheduler/triggers/date.py
@@ -30,15 +30,15 @@ class DateTrigger(Trigger):
def __getstate__(self) -> dict[str, Any]:
return {
- 'version': 1,
- 'run_time': marshal_date(self.run_time),
- 'completed': self._completed
+ "version": 1,
+ "run_time": marshal_date(self.run_time),
+ "completed": self._completed,
}
def __setstate__(self, state: dict[str, Any]) -> None:
require_state_version(self, state, 1)
- self.run_time = unmarshal_date(state['run_time'])
- self._completed = state['completed']
+ self.run_time = unmarshal_date(state["run_time"])
+ self._completed = state["completed"]
def __repr__(self) -> str:
return f"{self.__class__.__name__}('{self.run_time}')"
diff --git a/src/apscheduler/triggers/interval.py b/src/apscheduler/triggers/interval.py
index 629499b..fe410bd 100644
--- a/src/apscheduler/triggers/interval.py
+++ b/src/apscheduler/triggers/interval.py
@@ -36,21 +36,28 @@ class IntervalTrigger(Trigger):
minutes: float = 0
seconds: float = 0
microseconds: float = 0
- start_time: datetime = attrs.field(converter=as_aware_datetime, factory=datetime.now)
+ start_time: datetime = attrs.field(
+ converter=as_aware_datetime, factory=datetime.now
+ )
end_time: datetime | None = attrs.field(converter=as_aware_datetime, default=None)
_interval: timedelta = attrs.field(init=False, eq=False, repr=False)
_last_fire_time: datetime | None = attrs.field(init=False, eq=False, default=None)
def __attrs_post_init__(self) -> None:
- self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
- minutes=self.minutes, seconds=self.seconds,
- microseconds=self.microseconds)
+ self._interval = timedelta(
+ weeks=self.weeks,
+ days=self.days,
+ hours=self.hours,
+ minutes=self.minutes,
+ seconds=self.seconds,
+ microseconds=self.microseconds,
+ )
if self._interval.total_seconds() <= 0:
- raise ValueError('The time interval must be positive')
+ raise ValueError("The time interval must be positive")
if self.end_time and self.end_time < self.start_time:
- raise ValueError('end_time cannot be earlier than start_time')
+ raise ValueError("end_time cannot be earlier than start_time")
def next(self) -> datetime | None:
if self._last_fire_time is None:
@@ -65,31 +72,48 @@ class IntervalTrigger(Trigger):
def __getstate__(self) -> dict[str, Any]:
return {
- 'version': 1,
- 'interval': [self.weeks, self.days, self.hours, self.minutes, self.seconds,
- self.microseconds],
- 'start_time': marshal_date(self.start_time),
- 'end_time': marshal_date(self.end_time),
- 'last_fire_time': marshal_date(self._last_fire_time)
+ "version": 1,
+ "interval": [
+ self.weeks,
+ self.days,
+ self.hours,
+ self.minutes,
+ self.seconds,
+ self.microseconds,
+ ],
+ "start_time": marshal_date(self.start_time),
+ "end_time": marshal_date(self.end_time),
+ "last_fire_time": marshal_date(self._last_fire_time),
}
def __setstate__(self, state: dict[str, Any]) -> None:
require_state_version(self, state, 1)
- self.weeks, self.days, self.hours, self.minutes, self.seconds, self.microseconds = \
- state['interval']
- self.start_time = unmarshal_date(state['start_time'])
- self.end_time = unmarshal_date(state['end_time'])
- self._last_fire_time = unmarshal_date(state['last_fire_time'])
- self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours,
- minutes=self.minutes, seconds=self.seconds,
- microseconds=self.microseconds)
+ (
+ self.weeks,
+ self.days,
+ self.hours,
+ self.minutes,
+ self.seconds,
+ self.microseconds,
+ ) = state["interval"]
+ self.start_time = unmarshal_date(state["start_time"])
+ self.end_time = unmarshal_date(state["end_time"])
+ self._last_fire_time = unmarshal_date(state["last_fire_time"])
+ self._interval = timedelta(
+ weeks=self.weeks,
+ days=self.days,
+ hours=self.hours,
+ minutes=self.minutes,
+ seconds=self.seconds,
+ microseconds=self.microseconds,
+ )
def __repr__(self) -> str:
fields = []
- for field in 'weeks', 'days', 'hours', 'minutes', 'seconds', 'microseconds':
+ for field in "weeks", "days", "hours", "minutes", "seconds", "microseconds":
value = getattr(self, field)
if value > 0:
- fields.append(f'{field}={value}')
+ fields.append(f"{field}={value}")
fields.append(f"start_time='{self.start_time}'")
if self.end_time:
diff --git a/src/apscheduler/util.py b/src/apscheduler/util.py
index 53b9db8..7ff4858 100644
--- a/src/apscheduler/util.py
+++ b/src/apscheduler/util.py
@@ -11,7 +11,7 @@ if sys.version_info >= (3, 9):
else:
from backports.zoneinfo import ZoneInfo
-T = TypeVar('T')
+T = TypeVar("T")
class _Undefined:
@@ -19,10 +19,12 @@ class _Undefined:
return False
def __repr__(self):
- return '<undefined>'
+ return "<undefined>"
-undefined = _Undefined() #: a unique object that only signifies that no value is defined
+undefined = (
+ _Undefined()
+) #: a unique object that only signifies that no value is defined
def timezone_repr(timezone: tzinfo) -> str:
@@ -72,10 +74,10 @@ def reentrant(cls: type[T]) -> type[T]:
return await previous_aexit(self, exc_type, exc_val, exc_tb)
loans: dict[T, int] = defaultdict(lambda: 0)
- previous_enter: Callable = getattr(cls, '__enter__', None)
- previous_exit: Callable = getattr(cls, '__exit__', None)
- previous_aenter: Callable = getattr(cls, '__aenter__', None)
- previous_aexit: Callable = getattr(cls, '__aexit__', None)
+ previous_enter: Callable = getattr(cls, "__enter__", None)
+ previous_exit: Callable = getattr(cls, "__exit__", None)
+ previous_aenter: Callable = getattr(cls, "__aenter__", None)
+ previous_aexit: Callable = getattr(cls, "__aexit__", None)
if previous_enter and previous_exit:
cls.__enter__ = __enter__
cls.__exit__ = __exit__
diff --git a/src/apscheduler/validators.py b/src/apscheduler/validators.py
index baa7b50..995dce7 100644
--- a/src/apscheduler/validators.py
+++ b/src/apscheduler/validators.py
@@ -35,18 +35,20 @@ def as_timezone(value: str | tzinfo | None) -> tzinfo:
:return: a timezone object
"""
- if value is None or value == 'local':
+ if value is None or value == "local":
return get_localzone()
elif isinstance(value, str):
return ZoneInfo(value)
elif isinstance(value, tzinfo):
if value is timezone.utc:
- return ZoneInfo('UTC')
+ return ZoneInfo("UTC")
else:
return value
- raise TypeError(f'Expected tzinfo instance or timezone name, got '
- f'{value.__class__.__qualname__} instead')
+ raise TypeError(
+ f"Expected tzinfo instance or timezone name, got "
+ f"{value.__class__.__qualname__} instead"
+ )
def as_date(value: date | str | None) -> date | None:
@@ -64,7 +66,9 @@ def as_date(value: date | str | None) -> date | None:
elif isinstance(value, date):
return value
- raise TypeError(f'Expected string or date, got {value.__class__.__qualname__} instead')
+ raise TypeError(
+ f"Expected string or date, got {value.__class__.__qualname__} instead"
+ )
def as_timestamp(value: datetime | None) -> float | None:
@@ -94,8 +98,8 @@ def as_aware_datetime(value: datetime | str | None) -> datetime | None:
return None
if isinstance(value, str):
- if value.upper().endswith('Z'):
- value = value[:-1] + '+00:00'
+ if value.upper().endswith("Z"):
+ value = value[:-1] + "+00:00"
value = datetime.fromisoformat(value)
@@ -105,17 +109,19 @@ def as_aware_datetime(value: datetime | str | None) -> datetime | None:
else:
return value
- raise TypeError(f'Expected string or datetime, got {value.__class__.__qualname__} instead')
+ raise TypeError(
+ f"Expected string or datetime, got {value.__class__.__qualname__} instead"
+ )
def positive_number(instance, attribute, value) -> None:
if value <= 0:
- raise ValueError(f'Expected positive number, got {value} instead')
+ raise ValueError(f"Expected positive number, got {value} instead")
def non_negative_number(instance, attribute, value) -> None:
if value < 0:
- raise ValueError(f'Expected non-negative number, got {value} instead')
+ raise ValueError(f"Expected non-negative number, got {value} instead")
def as_positive_integer(value, name: str) -> int:
@@ -123,9 +129,11 @@ def as_positive_integer(value, name: str) -> int:
if value > 0:
return value
else:
- raise ValueError(f'{name} must be positive')
+ raise ValueError(f"{name} must be positive")
- raise TypeError(f'{name} must be an integer, got {value.__class__.__name__} instead')
+ raise TypeError(
+ f"{name} must be an integer, got {value.__class__.__name__} instead"
+ )
def as_timedelta(value: timedelta | float) -> timedelta:
@@ -142,30 +150,36 @@ def as_list(value, element_type: type, name: str) -> list:
value = list(value)
for i, element in enumerate(value):
if not isinstance(element, element_type):
- raise TypeError(f'Element at index {i} of {name} is not of the expected type '
- f'({element_type.__name__}')
+ raise TypeError(
+ f"Element at index {i} of {name} is not of the expected type "
+ f"({element_type.__name__}"
+ )
return value
def aware_datetime(instance: Any, attribute: Attribute, value: datetime) -> None:
if not value.tzinfo:
- raise ValueError(f'{attribute.name} must be a timezone aware datetime')
+ raise ValueError(f"{attribute.name} must be a timezone aware datetime")
-def require_state_version(trigger: Trigger, state: dict[str, Any], max_version: int) -> None:
+def require_state_version(
+ trigger: Trigger, state: dict[str, Any], max_version: int
+) -> None:
try:
- if state['version'] > max_version:
+ if state["version"] > max_version:
raise DeserializationError(
- f'{trigger.__class__.__name__} received a serialized state with version '
+ f"{trigger.__class__.__name__} received a serialized state with version "
f'{state["version"]}, but it only supports up to version {max_version}. '
- f'This can happen when an older version of APScheduler is being used with a data '
- f'store that was previously used with a newer APScheduler version.'
+ f"This can happen when an older version of APScheduler is being used with a data "
+ f"store that was previously used with a newer APScheduler version."
)
except KeyError as exc:
- raise DeserializationError('Missing "version" key in the serialized state') from exc
+ raise DeserializationError(
+ 'Missing "version" key in the serialized state'
+ ) from exc
def positive_integer(inst, field: attrs.Attribute, value) -> None:
if value <= 0:
- raise ValueError(f'{field} must be a positive integer')
+ raise ValueError(f"{field} must be a positive integer")
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index c581b0e..1a6e156 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -11,7 +11,12 @@ from uuid import UUID
import anyio
import attrs
-from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after
+from anyio import (
+ TASK_STATUS_IGNORED,
+ create_task_group,
+ get_cancelled_exc_class,
+ move_on_after,
+)
from anyio.abc import CancelScope
from ..abc import AsyncDataStore, EventSource, Job
@@ -27,21 +32,26 @@ from ..validators import positive_integer
@attrs.define(eq=False)
class AsyncWorker:
"""Runs jobs locally in a task group."""
+
data_store: AsyncDataStore = attrs.field(converter=as_async_datastore)
- max_concurrent_jobs: int = attrs.field(kw_only=True, validator=positive_integer, default=100)
+ max_concurrent_jobs: int = attrs.field(
+ kw_only=True, validator=positive_integer, default=100
+ )
identity: str = attrs.field(kw_only=True, default=None)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_wakeup_event: anyio.Event = attrs.field(init=False, factory=anyio.Event)
_acquired_jobs: set[Job] = attrs.field(init=False, factory=set)
- _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker)
+ _events: LocalAsyncEventBroker = attrs.field(
+ init=False, factory=LocalAsyncEventBroker
+ )
_running_jobs: set[UUID] = attrs.field(init=False, factory=set)
_exit_stack: AsyncExitStack = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -60,11 +70,15 @@ class AsyncWorker:
# Initialize the data store and start relaying events to the worker's event broker
await self._exit_stack.enter_async_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the worker if the data store emits a significant job event
self._exit_stack.enter_context(
- self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded})
+ self.data_store.events.subscribe(
+ lambda event: self._wakeup_event.set(), {JobAdded}
+ )
)
# Start the actual worker
@@ -81,8 +95,10 @@ class AsyncWorker:
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the worker is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the worker is in the "
+ f"{self._state} state"
+ )
# Set the current worker
token = current_worker.set(self)
@@ -108,14 +124,14 @@ class AsyncWorker:
except get_cancelled_exc_class():
pass
except BaseException as exc:
- self.logger.exception('Worker crashed')
+ self.logger.exception("Worker crashed")
exception = exc
else:
- self.logger.info('Worker stopped')
+ self.logger.info("Worker stopped")
finally:
current_worker.reset(token)
self._state = RunState.stopped
- self.logger.exception('Worker crashed')
+ self.logger.exception("Worker crashed")
with move_on_after(3, shield=True):
await self._events.publish(WorkerStopped(exception=exception))
@@ -124,7 +140,9 @@ class AsyncWorker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ )
await self.data_store.release_job(self.identity, job.task_id, result)
return
@@ -136,14 +154,20 @@ class AsyncWorker:
except get_cancelled_exc_class():
with CancelScope(shield=True):
result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled)
- await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.data_store.release_job(
+ self.identity, job.task_id, result
+ )
except BaseException as exc:
- result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.error, exception=exc
+ )
await self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ )
await self.data_store.release_job(self.identity, job.task_id, result)
finally:
job_info.reset(token)
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 7a6bee9..2da0045 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -25,8 +25,11 @@ from ..validators import positive_integer
@attrs.define(eq=False)
class Worker:
"""Runs jobs locally in a thread pool."""
+
data_store: DataStore
- max_concurrent_jobs: int = attrs.field(kw_only=True, validator=positive_integer, default=20)
+ max_concurrent_jobs: int = attrs.field(
+ kw_only=True, validator=positive_integer, default=20
+ )
identity: str = attrs.field(kw_only=True, default=None)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
@@ -40,7 +43,7 @@ class Worker:
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -59,11 +62,15 @@ class Worker:
# Initialize the data store and start relaying events to the worker's event broker
self._exit_stack.enter_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the worker if the data store emits a significant job event
self._exit_stack.enter_context(
- self.data_store.events.subscribe(lambda event: self._wakeup_event.set(), {JobAdded})
+ self.data_store.events.subscribe(
+ lambda event: self._wakeup_event.set(), {JobAdded}
+ )
)
# Start the worker and return when it has signalled readiness or raised an exception
@@ -87,8 +94,10 @@ class Worker:
def run(self) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the worker is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the worker is in the "
+ f"{self._state} state"
+ )
# Set the current worker
token = current_worker.set(self)
@@ -107,15 +116,17 @@ class Worker:
for job in jobs:
task = self.data_store.get_task(job.task_id)
self._running_jobs.add(job.id)
- executor.submit(copy_context().run, self._run_job, job, task.func)
+ executor.submit(
+ copy_context().run, self._run_job, job, task.func
+ )
self._wakeup_event.wait()
self._wakeup_event = threading.Event()
except BaseException as exc:
- self.logger.exception('Worker crashed')
+ self.logger.exception("Worker crashed")
exception = exc
else:
- self.logger.info('Worker stopped')
+ self.logger.info("Worker stopped")
finally:
current_worker.reset(token)
self._state = RunState.stopped
@@ -127,7 +138,9 @@ class Worker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ )
self.data_store.release_job(self.identity, job.task_id, result)
return
@@ -135,12 +148,16 @@ class Worker:
try:
retval = func(*job.args, **job.kwargs)
except BaseException as exc:
- result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.error, exception=exc
+ )
self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
+ result = JobResult(
+ job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ )
self.data_store.release_job(self.identity, job.task_id, result)
finally:
job_info.reset(token)
diff --git a/tests/conftest.py b/tests/conftest.py
index 37c363f..31ea9b0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -15,20 +15,22 @@ else:
from backports.zoneinfo import ZoneInfo
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def timezone() -> ZoneInfo:
- return ZoneInfo('Europe/Berlin')
+ return ZoneInfo("Europe/Berlin")
-@pytest.fixture(params=[
- pytest.param(PickleSerializer, id='pickle'),
- pytest.param(CBORSerializer, id='cbor'),
- pytest.param(JSONSerializer, id='json')
-])
+@pytest.fixture(
+ params=[
+ pytest.param(PickleSerializer, id="pickle"),
+ pytest.param(CBORSerializer, id="cbor"),
+ pytest.param(JSONSerializer, id="json"),
+ ]
+)
def serializer(request) -> Serializer | None:
return request.param() if request.param else None
@pytest.fixture
def anyio_backend() -> str:
- return 'asyncio'
+ return "asyncio"
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index e8584a3..2b8faea 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -15,7 +15,13 @@ from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome
from apscheduler.events import (
- Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskUpdated)
+ Event,
+ ScheduleAdded,
+ ScheduleRemoved,
+ ScheduleUpdated,
+ TaskAdded,
+ TaskUpdated,
+)
from apscheduler.structures import JobResult, Task
from apscheduler.triggers.date import DateTrigger
@@ -41,8 +47,8 @@ def sqlite_store() -> DataStore:
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
- with TemporaryDirectory('sqlite_') as tempdir:
- engine = create_engine(f'sqlite:///{tempdir}/test.db')
+ with TemporaryDirectory("sqlite_") as tempdir:
+ engine = create_engine(f"sqlite:///{tempdir}/test.db")
try:
yield SQLAlchemyDataStore(engine)
finally:
@@ -55,7 +61,7 @@ def psycopg2_store() -> DataStore:
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
- engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb')
+ engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb")
try:
yield SQLAlchemyDataStore(engine, start_from_scratch=True)
finally:
@@ -68,7 +74,7 @@ def mysql_store() -> DataStore:
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
- engine = create_engine('mysql+pymysql://root:secret@localhost/testdb')
+ engine = create_engine("mysql+pymysql://root:secret@localhost/testdb")
try:
yield SQLAlchemyDataStore(engine, start_from_scratch=True)
finally:
@@ -81,46 +87,74 @@ async def asyncpg_store() -> AsyncDataStore:
from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
- engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb',
- future=True)
+ engine = create_async_engine(
+ "postgresql+asyncpg://postgres:secret@localhost/testdb", future=True
+ )
try:
yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True)
finally:
await engine.dispose()
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('memory_store'), id='memory'),
- pytest.param(lazy_fixture('sqlite'), id='sqlite'),
- pytest.param(lazy_fixture('mongodb_store'), id='mongodb',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('psycopg2_store'), id='psycopg2',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('mysql_store'), id='mysql',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("memory_store"), id="memory"),
+ pytest.param(lazy_fixture("sqlite"), id="sqlite"),
+ pytest.param(
+ lazy_fixture("mongodb_store"),
+ id="mongodb",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("psycopg2_store"),
+ id="psycopg2",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mysql_store"),
+ id="mysql",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+)
def sync_store(request) -> DataStore:
return request.param
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('asyncpg_store'), id='asyncpg',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(
+ lazy_fixture("asyncpg_store"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ )
+ ]
+)
def async_store(request) -> AsyncDataStore:
return request.param
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('memory_store'), id='memory'),
- pytest.param(lazy_fixture('sqlite_store'), id='sqlite'),
- pytest.param(lazy_fixture('mongodb_store'), id='mongodb',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('psycopg2_store'), id='psycopg2',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('mysql_store'), id='mysql',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("memory_store"), id="memory"),
+ pytest.param(lazy_fixture("sqlite_store"), id="sqlite"),
+ pytest.param(
+ lazy_fixture("mongodb_store"),
+ id="mongodb",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("psycopg2_store"),
+ id="psycopg2",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mysql_store"),
+ id="mysql",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+)
async def datastore(request):
if isinstance(request.param, DataStore):
return AsyncDataStoreAdapter(request.param)
@@ -131,22 +165,21 @@ async def datastore(request):
@pytest.fixture
def schedules() -> list[Schedule]:
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule1 = Schedule(id='s1', task_id='task1', trigger=trigger)
+ schedule1 = Schedule(id="s1", task_id="task1", trigger=trigger)
schedule1.next_fire_time = trigger.next()
trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc))
- schedule2 = Schedule(id='s2', task_id='task2', trigger=trigger)
+ schedule2 = Schedule(id="s2", task_id="task2", trigger=trigger)
schedule2.next_fire_time = trigger.next()
trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc))
- schedule3 = Schedule(id='s3', task_id='task1', trigger=trigger)
+ schedule3 = Schedule(id="s3", task_id="task1", trigger=trigger)
return [schedule1, schedule2, schedule3]
@asynccontextmanager
async def capture_events(
- datastore: AsyncDataStore, limit: int,
- event_types: set[type[Event]] | None = None
+ datastore: AsyncDataStore, limit: int, event_types: set[type[Event]] | None = None
) -> AsyncGenerator[list[Event], None]:
def listener(event: Event) -> None:
events.append(event)
@@ -170,62 +203,71 @@ class TestAsyncStores:
event_types = {TaskAdded, TaskUpdated}
async with datastore, capture_events(datastore, 3, event_types) as events:
- await datastore.add_task(Task(id='test_task', func=print))
- await datastore.add_task(Task(id='test_task2', func=math.ceil))
- await datastore.add_task(Task(id='test_task', func=repr))
+ await datastore.add_task(Task(id="test_task", func=print))
+ await datastore.add_task(Task(id="test_task2", func=math.ceil))
+ await datastore.add_task(Task(id="test_task", func=repr))
tasks = await datastore.get_tasks()
assert len(tasks) == 2
- assert tasks[0].id == 'test_task'
+ assert tasks[0].id == "test_task"
assert tasks[0].func is repr
- assert tasks[1].id == 'test_task2'
+ assert tasks[1].id == "test_task2"
assert tasks[1].func is math.ceil
received_event = events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'test_task'
+ assert received_event.task_id == "test_task"
received_event = events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'test_task2'
+ assert received_event.task_id == "test_task2"
received_event = events.pop(0)
assert isinstance(received_event, TaskUpdated)
- assert received_event.task_id == 'test_task'
+ assert received_event.task_id == "test_task"
assert not events
- async def test_add_schedules(self, datastore: AsyncDataStore,
- schedules: list[Schedule]) -> None:
+ async def test_add_schedules(
+ self, datastore: AsyncDataStore, schedules: list[Schedule]
+ ) -> None:
async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
assert await datastore.get_schedules() == schedules
- assert await datastore.get_schedules({'s1', 's2', 's3'}) == schedules
- assert await datastore.get_schedules({'s1'}) == [schedules[0]]
- assert await datastore.get_schedules({'s2'}) == [schedules[1]]
- assert await datastore.get_schedules({'s3'}) == [schedules[2]]
+ assert await datastore.get_schedules({"s1", "s2", "s3"}) == schedules
+ assert await datastore.get_schedules({"s1"}) == [schedules[0]]
+ assert await datastore.get_schedules({"s2"}) == [schedules[1]]
+ assert await datastore.get_schedules({"s3"}) == [schedules[2]]
for event, schedule in zip(events, schedules):
assert event.schedule_id == schedule.id
assert event.next_fire_time == schedule.next_fire_time
- async def test_replace_schedules(self, datastore: AsyncDataStore,
- schedules: list[Schedule]) -> None:
+ async def test_replace_schedules(
+ self, datastore: AsyncDataStore, schedules: list[Schedule]
+ ) -> None:
async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
next_fire_time = schedules[2].trigger.next()
- schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(),
- kwargs={}, coalesce=CoalescePolicy.earliest,
- misfire_grace_time=None, tags=frozenset())
+ schedule = Schedule(
+ id="s3",
+ task_id="foo",
+ trigger=schedules[2].trigger,
+ args=(),
+ kwargs={},
+ coalesce=CoalescePolicy.earliest,
+ misfire_grace_time=None,
+ tags=frozenset(),
+ )
schedule.next_fire_time = next_fire_time
await datastore.add_schedule(schedule, ConflictPolicy.replace)
schedules = await datastore.get_schedules({schedule.id})
- assert schedules[0].task_id == 'foo'
+ assert schedules[0].task_id == "foo"
assert schedules[0].next_fire_time == next_fire_time
assert schedules[0].args == ()
assert schedules[0].kwargs == {}
@@ -234,47 +276,51 @@ class TestAsyncStores:
assert schedules[0].tags == frozenset()
received_event = events.pop(0)
- assert received_event.schedule_id == 's3'
- assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+ assert received_event.schedule_id == "s3"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
assert not events
- async def test_remove_schedules(self, datastore: AsyncDataStore,
- schedules: list[Schedule]) -> None:
+ async def test_remove_schedules(
+ self, datastore: AsyncDataStore, schedules: list[Schedule]
+ ) -> None:
async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
- await datastore.remove_schedules(['s1', 's2'])
+ await datastore.remove_schedules(["s1", "s2"])
assert await datastore.get_schedules() == [schedules[2]]
received_event = events.pop(0)
- assert received_event.schedule_id == 's1'
+ assert received_event.schedule_id == "s1"
received_event = events.pop(0)
- assert received_event.schedule_id == 's2'
+ assert received_event.schedule_id == "s2"
assert not events
@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
async def test_acquire_release_schedules(
- self, datastore: AsyncDataStore, schedules: list[Schedule]) -> None:
+ self, datastore: AsyncDataStore, schedules: list[Schedule]
+ ) -> None:
event_types = {ScheduleRemoved, ScheduleUpdated}
async with datastore, capture_events(datastore, 2, event_types) as events:
for schedule in schedules:
await datastore.add_schedule(schedule, ConflictPolicy.exception)
# The first scheduler gets the first due schedule
- schedules1 = await datastore.acquire_schedules('dummy-id1', 1)
+ schedules1 = await datastore.acquire_schedules("dummy-id1", 1)
assert len(schedules1) == 1
- assert schedules1[0].id == 's1'
+ assert schedules1[0].id == "s1"
# The second scheduler gets the second due schedule
- schedules2 = await datastore.acquire_schedules('dummy-id2', 1)
+ schedules2 = await datastore.acquire_schedules("dummy-id2", 1)
assert len(schedules2) == 1
- assert schedules2[0].id == 's2'
+ assert schedules2[0].id == "s2"
# The third scheduler gets nothing
- schedules3 = await datastore.acquire_schedules('dummy-id3', 1)
+ schedules3 = await datastore.acquire_schedules("dummy-id3", 1)
assert not schedules3
# Update the schedules and check that the job store actually deletes the first
@@ -283,24 +329,26 @@ class TestAsyncStores:
schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
# Release all the schedules
- await datastore.release_schedules('dummy-id1', schedules1)
- await datastore.release_schedules('dummy-id2', schedules2)
+ await datastore.release_schedules("dummy-id1", schedules1)
+ await datastore.release_schedules("dummy-id2", schedules2)
# Check that the first schedule is gone
schedules = await datastore.get_schedules()
assert len(schedules) == 2
- assert schedules[0].id == 's2'
- assert schedules[1].id == 's3'
+ assert schedules[0].id == "s2"
+ assert schedules[1].id == "s3"
# Check for the appropriate update and delete events
received_event = events.pop(0)
assert isinstance(received_event, ScheduleRemoved)
- assert received_event.schedule_id == 's1'
+ assert received_event.schedule_id == "s1"
received_event = events.pop(0)
assert isinstance(received_event, ScheduleUpdated)
- assert received_event.schedule_id == 's2'
- assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+ assert received_event.schedule_id == "s2"
+ assert received_event.next_fire_time == datetime(
+ 2020, 9, 15, tzinfo=timezone.utc
+ )
assert not events
@@ -311,20 +359,21 @@ class TestAsyncStores:
async with datastore:
for i in range(1, 3):
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
- schedule = Schedule(id=f's{i}', task_id='task1', trigger=trigger)
+ schedule = Schedule(id=f"s{i}", task_id="task1", trigger=trigger)
schedule.next_fire_time = trigger.next()
await datastore.add_schedule(schedule, ConflictPolicy.exception)
- schedules = await datastore.acquire_schedules('foo', 3)
+ schedules = await datastore.acquire_schedules("foo", 3)
schedules[0].next_fire_time = None
- await datastore.release_schedules('foo', schedules)
+ await datastore.release_schedules("foo", schedules)
remaining = await datastore.get_schedules({s.id for s in schedules})
assert len(remaining) == 1
assert remaining[0].id == schedules[1].id
async def test_acquire_schedules_lock_timeout(
- self, datastore: AsyncDataStore, schedules: list[Schedule], freezer) -> None:
+ self, datastore: AsyncDataStore, schedules: list[Schedule], freezer
+ ) -> None:
"""
Test that a scheduler can acquire schedules that were acquired by another scheduler but
not released within the lock timeout period.
@@ -334,60 +383,66 @@ class TestAsyncStores:
await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
# First, one scheduler acquires the first available schedule
- acquired1 = await datastore.acquire_schedules('dummy-id1', 1)
+ acquired1 = await datastore.acquire_schedules("dummy-id1", 1)
assert len(acquired1) == 1
- assert acquired1[0].id == 's1'
+ assert acquired1[0].id == "s1"
# Try to acquire the schedule just at the threshold (now == acquired_until).
# This should not yield any schedules.
freezer.tick(30)
- acquired2 = await datastore.acquire_schedules('dummy-id2', 1)
+ acquired2 = await datastore.acquire_schedules("dummy-id2", 1)
assert not acquired2
# Right after that, the schedule should be available
freezer.tick(1)
- acquired3 = await datastore.acquire_schedules('dummy-id2', 1)
+ acquired3 = await datastore.acquire_schedules("dummy-id2", 1)
assert len(acquired3) == 1
- assert acquired3[0].id == 's1'
+ assert acquired3[0].id == "s1"
async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- jobs = [Job(task_id='task1') for _ in range(2)]
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ jobs = [Job(task_id="task1") for _ in range(2)]
for job in jobs:
await datastore.add_job(job)
# The first worker gets the first job in the queue
- jobs1 = await datastore.acquire_jobs('worker1', 1)
+ jobs1 = await datastore.acquire_jobs("worker1", 1)
assert len(jobs1) == 1
assert jobs1[0].id == jobs[0].id
# The second worker gets the second job
- jobs2 = await datastore.acquire_jobs('worker2', 1)
+ jobs2 = await datastore.acquire_jobs("worker2", 1)
assert len(jobs2) == 1
assert jobs2[0].id == jobs[1].id
# The third worker gets nothing
- jobs3 = await datastore.acquire_jobs('worker3', 1)
+ jobs3 = await datastore.acquire_jobs("worker3", 1)
assert not jobs3
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- job = Job(task_id='task1')
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs("worker_id", 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
await datastore.release_job(
- 'worker_id', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.success, return_value='foo'))
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.success,
+ return_value="foo",
+ ),
+ )
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.success
assert result.exception is None
- assert result.return_value == 'foo'
+ assert result.return_value == "foo"
# Check that the job and its result are gone
assert not await datastore.get_jobs({acquired[0].id})
@@ -395,22 +450,27 @@ class TestAsyncStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- job = Job(task_id='task1')
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs("worker_id", 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
await datastore.release_job(
- 'worker_id', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.error,
- exception=ValueError('foo')))
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id,
+ outcome=JobOutcome.error,
+ exception=ValueError("foo"),
+ ),
+ )
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.error
assert isinstance(result.exception, ValueError)
- assert result.exception.args == ('foo',)
+ assert result.exception.args == ("foo",)
assert result.return_value is None
# Check that the job and its result are gone
@@ -419,17 +479,21 @@ class TestAsyncStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- job = Job(task_id='task1')
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs("worker_id", 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
await datastore.release_job(
- 'worker_id', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline))
+ "worker_id",
+ acquired[0].task_id,
+ JobResult(
+ job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline
+ ),
+ )
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
assert result.exception is None
@@ -441,17 +505,19 @@ class TestAsyncStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- job = Job(task_id='task1')
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
await datastore.add_job(job)
- acquired = await datastore.acquire_jobs('worker1', 2)
+ acquired = await datastore.acquire_jobs("worker1", 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
await datastore.release_job(
- 'worker1', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled))
+ "worker1",
+ acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ )
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
assert result.exception is None
@@ -461,50 +527,59 @@ class TestAsyncStores:
assert not await datastore.get_jobs({acquired[0].id})
assert not await datastore.get_job_result(acquired[0].id)
- async def test_acquire_jobs_lock_timeout(self, datastore: AsyncDataStore,
- freezer: FrozenDateTimeFactory) -> None:
+ async def test_acquire_jobs_lock_timeout(
+ self, datastore: AsyncDataStore, freezer: FrozenDateTimeFactory
+ ) -> None:
"""
Test that a worker can acquire jobs that were acquired by another scheduler but not
released within the lock timeout period.
"""
async with datastore:
- await datastore.add_task(Task(id='task1', func=asynccontextmanager))
- job = Job(task_id='task1')
+ await datastore.add_task(Task(id="task1", func=asynccontextmanager))
+ job = Job(task_id="task1")
await datastore.add_job(job)
# First, one worker acquires the first available job
- acquired = await datastore.acquire_jobs('worker1', 1)
+ acquired = await datastore.acquire_jobs("worker1", 1)
assert len(acquired) == 1
assert acquired[0].id == job.id
# Try to acquire the job just at the threshold (now == acquired_until).
# This should not yield any jobs.
freezer.tick(30)
- assert not await datastore.acquire_jobs('worker2', 1)
+ assert not await datastore.acquire_jobs("worker2", 1)
# Right after that, the job should be available
freezer.tick(1)
- acquired = await datastore.acquire_jobs('worker2', 1)
+ acquired = await datastore.acquire_jobs("worker2", 1)
assert len(acquired) == 1
assert acquired[0].id == job.id
- async def test_acquire_jobs_max_number_exceeded(self, datastore: AsyncDataStore) -> None:
+ async def test_acquire_jobs_max_number_exceeded(
+ self, datastore: AsyncDataStore
+ ) -> None:
async with datastore:
await datastore.add_task(
- Task(id='task1', func=asynccontextmanager, max_running_jobs=2))
- jobs = [Job(task_id='task1'), Job(task_id='task1'), Job(task_id='task1')]
+ Task(id="task1", func=asynccontextmanager, max_running_jobs=2)
+ )
+ jobs = [Job(task_id="task1"), Job(task_id="task1"), Job(task_id="task1")]
for job in jobs:
await datastore.add_job(job)
# Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
- acquired_jobs = await datastore.acquire_jobs('worker1', 3)
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
# Release one job, and the worker should be able to acquire the third job
await datastore.release_job(
- 'worker1', acquired_jobs[0].task_id,
- JobResult(job_id=acquired_jobs[0].id, outcome=JobOutcome.success,
- return_value=None))
- acquired_jobs = await datastore.acquire_jobs('worker1', 3)
+ "worker1",
+ acquired_jobs[0].task_id,
+ JobResult(
+ job_id=acquired_jobs[0].id,
+ outcome=JobOutcome.success,
+ return_value=None,
+ ),
+ )
+ acquired_jobs = await datastore.acquire_jobs("worker1", 3)
assert [job.id for job in acquired_jobs] == [jobs[2].id]
diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py
index 1d29ede..7ec90ab 100644
--- a/tests/test_eventbrokers.py
+++ b/tests/test_eventbrokers.py
@@ -33,7 +33,7 @@ def local_async_broker() -> AsyncEventBroker:
def redis_broker(serializer: Serializer) -> EventBroker:
from apscheduler.eventbrokers.redis import RedisEventBroker
- broker = RedisEventBroker.from_url('redis://localhost:6379')
+ broker = RedisEventBroker.from_url("redis://localhost:6379")
broker.serializer = serializer
return broker
@@ -53,29 +53,40 @@ async def asyncpg_broker(serializer: Serializer) -> AsyncEventBroker:
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
- pool = await create_pool('postgres://postgres:secret@localhost:5432/testdb')
+ pool = await create_pool("postgres://postgres:secret@localhost:5432/testdb")
broker = AsyncpgEventBroker.from_asyncpg_pool(pool)
broker.serializer = serializer
yield broker
await pool.close()
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('local_broker'), id='local'),
- pytest.param(lazy_fixture('redis_broker'), id='redis',
- marks=[pytest.mark.external_service]),
- pytest.param(lazy_fixture('mqtt_broker'), id='mqtt',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("local_broker"), id="local"),
+ pytest.param(
+ lazy_fixture("redis_broker"),
+ id="redis",
+ marks=[pytest.mark.external_service],
+ ),
+ pytest.param(
+ lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
+ ),
+ ]
+)
def broker(request: SubRequest) -> Callable[[], EventBroker]:
return request.param
-@pytest.fixture(params=[
- pytest.param(lazy_fixture('local_async_broker'), id='local'),
- pytest.param(lazy_fixture('asyncpg_broker'), id='asyncpg',
- marks=[pytest.mark.external_service])
-])
+@pytest.fixture(
+ params=[
+ pytest.param(lazy_fixture("local_async_broker"), id="local"),
+ pytest.param(
+ lazy_fixture("asyncpg_broker"),
+ id="asyncpg",
+ marks=[pytest.mark.external_service],
+ ),
+ ]
+)
def async_broker(request: SubRequest) -> Callable[[], AsyncEventBroker]:
return request.param
@@ -87,8 +98,9 @@ class TestEventBroker:
broker.subscribe(queue.put_nowait)
broker.subscribe(queue.put_nowait)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
broker.publish(event)
event1 = queue.get(timeout=3)
event2 = queue.get(timeout=1)
@@ -96,27 +108,31 @@ class TestEventBroker:
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == 'schedule1'
- assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ assert event1.schedule_id == "schedule1"
+ assert event1.next_fire_time == datetime(
+ 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
+ )
def test_subscribe_one_shot(self, broker: EventBroker) -> None:
queue: Queue[Event] = Queue()
with broker:
broker.subscribe(queue.put_nowait, one_shot=True)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
broker.publish(event)
event = ScheduleAdded(
- schedule_id='schedule2',
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
broker.publish(event)
received_event = queue.get(timeout=3)
with pytest.raises(Empty):
queue.get(timeout=0.1)
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'schedule1'
+ assert received_event.schedule_id == "schedule1"
def test_unsubscribe(self, broker: EventBroker, caplog) -> None:
queue: Queue[Event] = Queue()
@@ -130,15 +146,19 @@ class TestEventBroker:
with pytest.raises(Empty):
queue.get(timeout=0.1)
- def test_publish_no_subscribers(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def test_publish_no_subscribers(
+ self, broker: EventBroker, caplog: LogCaptureFixture
+ ) -> None:
with broker:
broker.publish(Event())
assert not caplog.text
- def test_publish_exception(self, broker: EventBroker, caplog: LogCaptureFixture) -> None:
+ def test_publish_exception(
+ self, broker: EventBroker, caplog: LogCaptureFixture
+ ) -> None:
def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
+ raise Exception("foo")
timestamp = datetime.now(timezone.utc)
event_future: Future[Event] = Future()
@@ -150,7 +170,7 @@ class TestEventBroker:
event = event_future.result(3)
assert isinstance(event, Event)
assert event.timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
+ assert "Error delivering Event" in caplog.text
@pytest.mark.anyio
@@ -161,8 +181,9 @@ class TestAsyncEventBroker:
async_broker.subscribe(send.send)
async_broker.subscribe(send.send_nowait)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
await async_broker.publish(event)
with fail_after(3):
@@ -172,20 +193,24 @@ class TestAsyncEventBroker:
assert event1 == event2
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
- assert event1.schedule_id == 'schedule1'
- assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc)
+ assert event1.schedule_id == "schedule1"
+ assert event1.next_fire_time == datetime(
+ 2021, 9, 11, 12, 31, 56, 254867, timezone.utc
+ )
async def test_subscribe_one_shot(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream(2)
async with async_broker:
async_broker.subscribe(send.send, one_shot=True)
event = ScheduleAdded(
- schedule_id='schedule1',
- next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc))
+ schedule_id="schedule1",
+ next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
+ )
await async_broker.publish(event)
event = ScheduleAdded(
- schedule_id='schedule2',
- next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc))
+ schedule_id="schedule2",
+ next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
+ )
await async_broker.publish(event)
with fail_after(3):
@@ -195,7 +220,7 @@ class TestAsyncEventBroker:
await receive.receive()
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'schedule1'
+ assert received_event.schedule_id == "schedule1"
async def test_unsubscribe(self, async_broker: AsyncEventBroker) -> None:
send, receive = create_memory_object_stream()
@@ -210,17 +235,19 @@ class TestAsyncEventBroker:
with pytest.raises(TimeoutError), fail_after(0.1):
await receive.receive()
- async def test_publish_no_subscribers(self, async_broker: AsyncEventBroker,
- caplog: LogCaptureFixture) -> None:
+ async def test_publish_no_subscribers(
+ self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
+ ) -> None:
async with async_broker:
await async_broker.publish(Event())
assert not caplog.text
- async def test_publish_exception(self, async_broker: AsyncEventBroker,
- caplog: LogCaptureFixture) -> None:
+ async def test_publish_exception(
+ self, async_broker: AsyncEventBroker, caplog: LogCaptureFixture
+ ) -> None:
def bad_subscriber(event: Event) -> None:
- raise Exception('foo')
+ raise Exception("foo")
timestamp = datetime.now(timezone.utc)
send, receive = create_memory_object_stream()
@@ -231,4 +258,4 @@ class TestAsyncEventBroker:
received_event = await receive.receive()
assert received_event.timestamp == timestamp
- assert 'Error delivering Event' in caplog.text
+ assert "Error delivering Event" in caplog.text
diff --git a/tests/test_marshalling.py b/tests/test_marshalling.py
index c209388..36c8aca 100644
--- a/tests/test_marshalling.py
+++ b/tests/test_marshalling.py
@@ -39,10 +39,14 @@ class InheritedDummyClass(DummyClass):
class TestCallableToRef:
- @pytest.mark.parametrize('obj, error', [
- (partial(DummyClass.meth), 'Cannot create a reference to a partial()'),
- (lambda: None, 'Cannot create a reference to a lambda')
- ], ids=['partial', 'lambda'])
+ @pytest.mark.parametrize(
+ "obj, error",
+ [
+ (partial(DummyClass.meth), "Cannot create a reference to a partial()"),
+ (lambda: None, "Cannot create a reference to a lambda"),
+ ],
+ ids=["partial", "lambda"],
+ )
def test_errors(self, obj, error):
exc = pytest.raises(SerializationError, callable_to_ref, obj)
assert str(exc.value) == error
@@ -52,18 +56,33 @@ class TestCallableToRef:
pass
exc = pytest.raises(SerializationError, callable_to_ref, nested)
- assert str(exc.value) == 'Cannot create a reference to a nested function'
-
- @pytest.mark.parametrize('input,expected', [
- (DummyClass.meth, 'test_marshalling:DummyClass.meth'),
- (DummyClass.classmeth, 'test_marshalling:DummyClass.classmeth'),
- (DummyClass.InnerDummyClass.innerclassmeth,
- 'test_marshalling:DummyClass.InnerDummyClass.innerclassmeth'),
- (DummyClass.staticmeth, 'test_marshalling:DummyClass.staticmeth'),
- (InheritedDummyClass.classmeth, 'test_marshalling:InheritedDummyClass.classmeth'),
- (timedelta, 'datetime:timedelta'),
- ], ids=['unbound method', 'class method', 'inner class method', 'static method',
- 'inherited class method', 'timedelta'])
+ assert str(exc.value) == "Cannot create a reference to a nested function"
+
+ @pytest.mark.parametrize(
+ "input,expected",
+ [
+ (DummyClass.meth, "test_marshalling:DummyClass.meth"),
+ (DummyClass.classmeth, "test_marshalling:DummyClass.classmeth"),
+ (
+ DummyClass.InnerDummyClass.innerclassmeth,
+ "test_marshalling:DummyClass.InnerDummyClass.innerclassmeth",
+ ),
+ (DummyClass.staticmeth, "test_marshalling:DummyClass.staticmeth"),
+ (
+ InheritedDummyClass.classmeth,
+ "test_marshalling:InheritedDummyClass.classmeth",
+ ),
+ (timedelta, "datetime:timedelta"),
+ ],
+ ids=[
+ "unbound method",
+ "class method",
+ "inner class method",
+ "static method",
+ "inherited class method",
+ "timedelta",
+ ],
+ )
def test_valid_refs(self, input, expected):
assert callable_to_ref(input) == expected
@@ -71,21 +90,25 @@ class TestCallableToRef:
class TestCallableFromRef:
def test_valid_ref(self):
from logging.handlers import RotatingFileHandler
- assert callable_from_ref('logging.handlers:RotatingFileHandler') is RotatingFileHandler
+
+ assert (
+ callable_from_ref("logging.handlers:RotatingFileHandler")
+ is RotatingFileHandler
+ )
def test_complex_path(self):
- pkg1 = ModuleType('pkg1')
- pkg1.pkg2 = 'blah'
- pkg2 = ModuleType('pkg1.pkg2')
+ pkg1 = ModuleType("pkg1")
+ pkg1.pkg2 = "blah"
+ pkg2 = ModuleType("pkg1.pkg2")
pkg2.varname = lambda: None
- sys.modules['pkg1'] = pkg1
- sys.modules['pkg1.pkg2'] = pkg2
- assert callable_from_ref('pkg1.pkg2:varname') == pkg2.varname
-
- @pytest.mark.parametrize('input,error', [
- (object(), TypeError),
- ('module', ValueError),
- ('module:blah', LookupError)
- ], ids=['raw object', 'module', 'module attribute'])
+ sys.modules["pkg1"] = pkg1
+ sys.modules["pkg1.pkg2"] = pkg2
+ assert callable_from_ref("pkg1.pkg2:varname") == pkg2.varname
+
+ @pytest.mark.parametrize(
+ "input,error",
+ [(object(), TypeError), ("module", ValueError), ("module:blah", LookupError)],
+ ids=["raw object", "module", "module attribute"],
+ )
def test_lookup_error(self, input, error):
pytest.raises(error, callable_from_ref, input)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 4a1cc4e..b8c64a2 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -14,7 +14,14 @@ from pytest_mock import MockerFixture
from apscheduler.context import current_scheduler, current_worker, job_info
from apscheduler.enums import JobOutcome
from apscheduler.events import (
- Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped, TaskAdded)
+ Event,
+ JobAdded,
+ ScheduleAdded,
+ ScheduleRemoved,
+ SchedulerStarted,
+ SchedulerStopped,
+ TaskAdded,
+)
from apscheduler.exceptions import JobLookupError
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
@@ -33,17 +40,17 @@ pytestmark = pytest.mark.anyio
async def dummy_async_job(delay: float = 0, fail: bool = False) -> str:
await anyio.sleep(delay)
if fail:
- raise RuntimeError('failing as requested')
+ raise RuntimeError("failing as requested")
else:
- return 'returnvalue'
+ return "returnvalue"
def dummy_sync_job(delay: float = 0, fail: bool = False) -> str:
time.sleep(delay)
if fail:
- raise RuntimeError('failing as requested')
+ raise RuntimeError("failing as requested")
else:
- return 'returnvalue'
+ return "returnvalue"
class TestAsyncScheduler:
@@ -59,7 +66,7 @@ class TestAsyncScheduler:
scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
async with scheduler:
- await scheduler.add_schedule(dummy_async_job, trigger, id='foo')
+ await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
with fail_after(3):
await event.wait()
@@ -70,24 +77,24 @@ class TestAsyncScheduler:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'test_schedulers:dummy_async_job'
+ assert received_event.task_id == "test_schedulers:dummy_async_job"
# Then a schedule was added
received_event = received_events.pop(0)
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'foo'
+ assert received_event.schedule_id == "foo"
# assert received_event.task_id == 'task_id'
# Then that schedule was processed and a job was added for it
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
- assert received_event.schedule_id == 'foo'
- assert received_event.task_id == 'test_schedulers:dummy_async_job'
+ assert received_event.schedule_id == "foo"
+ assert received_event.task_id == "test_schedulers:dummy_async_job"
# Then the schedule was removed since the trigger had been exhausted
received_event = received_events.pop(0)
assert isinstance(received_event, ScheduleRemoved)
- assert received_event.schedule_id == 'foo'
+ assert received_event.schedule_id == "foo"
# Finally, the scheduler was stopped
received_event = received_events.pop(0)
@@ -96,12 +103,17 @@ class TestAsyncScheduler:
# There should be no more events on the list
assert not received_events
- @pytest.mark.parametrize('max_jitter, expected_upper_bound', [
- pytest.param(2, 2, id='within'),
- pytest.param(4, 2.999999, id='exceed')
- ])
- async def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float,
- expected_upper_bound: float) -> None:
+ @pytest.mark.parametrize(
+ "max_jitter, expected_upper_bound",
+ [pytest.param(2, 2, id="within"), pytest.param(4, 2.999999, id="exceed")],
+ )
+ async def test_jitter(
+ self,
+ mocker: MockerFixture,
+ timezone: ZoneInfo,
+ max_jitter: float,
+ expected_upper_bound: float,
+ ) -> None:
job_id: UUID | None = None
def job_added_listener(event: Event) -> None:
@@ -112,14 +124,15 @@ class TestAsyncScheduler:
jitter = 1.569374
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
- fake_uniform = mocker.patch('random.uniform')
+ fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
async with AsyncScheduler(start_worker=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = anyio.Event()
scheduler.events.subscribe(job_added_listener, {JobAdded})
- schedule_id = await scheduler.add_schedule(dummy_async_job, trigger,
- max_jitter=max_jitter)
+ schedule_id = await scheduler.add_schedule(
+ dummy_async_job, trigger, max_jitter=max_jitter
+ )
schedule = await scheduler.get_schedule(schedule_id)
assert schedule.max_jitter == timedelta(seconds=max_jitter)
@@ -132,61 +145,72 @@ class TestAsyncScheduler:
# Check that the job was created with the proper amount of jitter in its scheduled time
jobs = await scheduler.data_store.get_jobs({job_id})
assert jobs[0].jitter == timedelta(seconds=jitter)
- assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter)
+ assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(
+ seconds=jitter
+ )
assert jobs[0].original_scheduled_time == orig_start_time
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2})
+ job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
- assert result.return_value == 'returnvalue'
+ assert result.return_value == "returnvalue"
async def test_get_job_result_error(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2, 'fail': True})
+ job_id = await scheduler.add_job(
+ dummy_async_job, kwargs={"delay": 0.2, "fail": True}
+ )
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.error
assert isinstance(result.exception, RuntimeError)
- assert str(result.exception) == 'failing as requested'
+ assert str(result.exception) == "failing as requested"
async def test_get_job_result_nowait_not_yet_ready(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2})
+ job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
with pytest.raises(JobLookupError):
await scheduler.get_job_result(job_id, wait=False)
async def test_run_job_success(self) -> None:
async with AsyncScheduler() as scheduler:
return_value = await scheduler.run_job(dummy_async_job)
- assert return_value == 'returnvalue'
+ assert return_value == "returnvalue"
async def test_run_job_failure(self) -> None:
async with AsyncScheduler() as scheduler:
- with pytest.raises(RuntimeError, match='failing as requested'):
- await scheduler.run_job(dummy_async_job, kwargs={'fail': True})
+ with pytest.raises(RuntimeError, match="failing as requested"):
+ await scheduler.run_job(dummy_async_job, kwargs={"fail": True})
async def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
assert current_worker.get() is scheduler.worker
info = job_info.get()
- assert info.task_id == 'task_id'
- assert info.schedule_id == 'foo'
+ assert info.task_id == "task_id"
+ assert info.schedule_id == "foo"
assert info.scheduled_fire_time == scheduled_fire_time
assert info.jitter == timedelta(seconds=2.16)
assert info.start_deadline == start_deadline
- assert info.tags == {'foo', 'bar'}
+ assert info.tags == {"foo", "bar"}
scheduled_fire_time = datetime.now(timezone.utc)
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
async with AsyncScheduler() as scheduler:
- await scheduler.data_store.add_task(Task(id='task_id', func=check_contextvars))
- job = Job(task_id='task_id', schedule_id='foo',
- scheduled_fire_time=scheduled_fire_time, jitter=timedelta(seconds=2.16),
- start_deadline=start_deadline, tags={'foo', 'bar'})
+ await scheduler.data_store.add_task(
+ Task(id="task_id", func=check_contextvars)
+ )
+ job = Job(
+ task_id="task_id",
+ schedule_id="foo",
+ scheduled_fire_time=scheduled_fire_time,
+ jitter=timedelta(seconds=2.16),
+ start_deadline=start_deadline,
+ tags={"foo", "bar"},
+ )
await scheduler.data_store.add_job(job)
result = await scheduler.get_job_result(job.id)
if result.outcome is JobOutcome.error:
@@ -208,7 +232,7 @@ class TestSyncScheduler:
scheduler.events.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
with scheduler:
- scheduler.add_schedule(dummy_sync_job, trigger, id='foo')
+ scheduler.add_schedule(dummy_sync_job, trigger, id="foo")
event.wait(3)
# The scheduler was first started
@@ -218,23 +242,23 @@ class TestSyncScheduler:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'test_schedulers:dummy_sync_job'
+ assert received_event.task_id == "test_schedulers:dummy_sync_job"
# Then a schedule was added
received_event = received_events.pop(0)
assert isinstance(received_event, ScheduleAdded)
- assert received_event.schedule_id == 'foo'
+ assert received_event.schedule_id == "foo"
# Then that schedule was processed and a job was added for it
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
- assert received_event.schedule_id == 'foo'
- assert received_event.task_id == 'test_schedulers:dummy_sync_job'
+ assert received_event.schedule_id == "foo"
+ assert received_event.task_id == "test_schedulers:dummy_sync_job"
# Then the schedule was removed since the trigger had been exhausted
received_event = received_events.pop(0)
assert isinstance(received_event, ScheduleRemoved)
- assert received_event.schedule_id == 'foo'
+ assert received_event.schedule_id == "foo"
# Finally, the scheduler was stopped
received_event = received_events.pop(0)
@@ -243,12 +267,17 @@ class TestSyncScheduler:
# There should be no more events on the list
assert not received_events
- @pytest.mark.parametrize('max_jitter, expected_upper_bound', [
- pytest.param(2, 2, id='within'),
- pytest.param(4, 2.999999, id='exceed')
- ])
- def test_jitter(self, mocker: MockerFixture, timezone: ZoneInfo, max_jitter: float,
- expected_upper_bound: float) -> None:
+ @pytest.mark.parametrize(
+ "max_jitter, expected_upper_bound",
+ [pytest.param(2, 2, id="within"), pytest.param(4, 2.999999, id="exceed")],
+ )
+ def test_jitter(
+ self,
+ mocker: MockerFixture,
+ timezone: ZoneInfo,
+ max_jitter: float,
+ expected_upper_bound: float,
+ ) -> None:
job_id: UUID | None = None
def job_added_listener(event: Event) -> None:
@@ -259,13 +288,15 @@ class TestSyncScheduler:
jitter = 1.569374
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
- fake_uniform = mocker.patch('random.uniform')
+ fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
with Scheduler(start_worker=False) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = threading.Event()
scheduler.events.subscribe(job_added_listener, {JobAdded})
- schedule_id = scheduler.add_schedule(dummy_async_job, trigger, max_jitter=max_jitter)
+ schedule_id = scheduler.add_schedule(
+ dummy_async_job, trigger, max_jitter=max_jitter
+ )
schedule = scheduler.get_schedule(schedule_id)
assert schedule.max_jitter == timedelta(seconds=max_jitter)
@@ -277,7 +308,9 @@ class TestSyncScheduler:
# Check that the job was created with the proper amount of jitter in its scheduled time
jobs = scheduler.data_store.get_jobs({job_id})
assert jobs[0].jitter == timedelta(seconds=jitter)
- assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(seconds=jitter)
+ assert jobs[0].scheduled_fire_time == orig_start_time + timedelta(
+ seconds=jitter
+ )
assert jobs[0].original_scheduled_time == orig_start_time
def test_get_job_result(self) -> None:
@@ -285,52 +318,59 @@ class TestSyncScheduler:
job_id = scheduler.add_job(dummy_sync_job)
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
- assert result.return_value == 'returnvalue'
+ assert result.return_value == "returnvalue"
def test_get_job_result_error(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2, 'fail': True})
+ job_id = scheduler.add_job(
+ dummy_sync_job, kwargs={"delay": 0.2, "fail": True}
+ )
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.error
assert isinstance(result.exception, RuntimeError)
- assert str(result.exception) == 'failing as requested'
+ assert str(result.exception) == "failing as requested"
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2})
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
with pytest.raises(JobLookupError):
scheduler.get_job_result(job_id, wait=False)
def test_run_job_success(self) -> None:
with Scheduler() as scheduler:
return_value = scheduler.run_job(dummy_sync_job)
- assert return_value == 'returnvalue'
+ assert return_value == "returnvalue"
def test_run_job_failure(self) -> None:
with Scheduler() as scheduler:
- with pytest.raises(RuntimeError, match='failing as requested'):
- scheduler.run_job(dummy_sync_job, kwargs={'fail': True})
+ with pytest.raises(RuntimeError, match="failing as requested"):
+ scheduler.run_job(dummy_sync_job, kwargs={"fail": True})
def test_contextvars(self) -> None:
def check_contextvars() -> None:
assert current_scheduler.get() is scheduler
assert current_worker.get() is scheduler.worker
info = job_info.get()
- assert info.task_id == 'task_id'
- assert info.schedule_id == 'foo'
+ assert info.task_id == "task_id"
+ assert info.schedule_id == "foo"
assert info.scheduled_fire_time == scheduled_fire_time
assert info.jitter == timedelta(seconds=2.16)
assert info.start_deadline == start_deadline
- assert info.tags == {'foo', 'bar'}
+ assert info.tags == {"foo", "bar"}
scheduled_fire_time = datetime.now(timezone.utc)
start_deadline = datetime.now(timezone.utc) + timedelta(seconds=10)
with Scheduler() as scheduler:
- scheduler.data_store.add_task(Task(id='task_id', func=check_contextvars))
- job = Job(task_id='task_id', schedule_id='foo',
- scheduled_fire_time=scheduled_fire_time, jitter=timedelta(seconds=2.16),
- start_deadline=start_deadline, tags={'foo', 'bar'})
+ scheduler.data_store.add_task(Task(id="task_id", func=check_contextvars))
+ job = Job(
+ task_id="task_id",
+ schedule_id="foo",
+ scheduled_fire_time=scheduled_fire_time,
+ jitter=timedelta(seconds=2.16),
+ start_deadline=start_deadline,
+ tags={"foo", "bar"},
+ )
scheduler.data_store.add_job(job)
result = scheduler.get_job_result(job.id)
if result.outcome is JobOutcome.error:
diff --git a/tests/test_workers.py b/tests/test_workers.py
index dcfafb1..6e5568f 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -12,7 +12,14 @@ from apscheduler.abc import Job
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import JobOutcome
from apscheduler.events import (
- Event, JobAcquired, JobAdded, JobReleased, TaskAdded, WorkerStarted, WorkerStopped)
+ Event,
+ JobAcquired,
+ JobAdded,
+ JobReleased,
+ TaskAdded,
+ WorkerStarted,
+ WorkerStopped,
+)
from apscheduler.structures import Task
from apscheduler.workers.async_ import AsyncWorker
from apscheduler.workers.sync import Worker
@@ -22,26 +29,30 @@ pytestmark = pytest.mark.anyio
def sync_func(*args, fail: bool, **kwargs):
if fail:
- raise Exception('failing as requested')
+ raise Exception("failing as requested")
else:
return args, kwargs
async def async_func(*args, fail: bool, **kwargs):
if fail:
- raise Exception('failing as requested')
+ raise Exception("failing as requested")
else:
return args, kwargs
def fail_func():
- pytest.fail('This function should never be run')
+ pytest.fail("This function should never be run")
class TestAsyncWorker:
- @pytest.mark.parametrize('target_func', [sync_func, async_func], ids=['sync', 'async'])
- @pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail'])
- async def test_run_job_nonscheduled_success(self, target_func: Callable, fail: bool) -> None:
+ @pytest.mark.parametrize(
+ "target_func", [sync_func, async_func], ids=["sync", "async"]
+ )
+ @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"])
+ async def test_run_job_nonscheduled_success(
+ self, target_func: Callable, fail: bool
+ ) -> None:
def listener(received_event: Event):
received_events.append(received_event)
if len(received_events) == 5:
@@ -53,8 +64,8 @@ class TestAsyncWorker:
worker = AsyncWorker(data_store)
worker.events.subscribe(listener)
async with worker:
- await worker.data_store.add_task(Task(id='task_id', func=target_func))
- job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
+ await worker.data_store.add_task(Task(id="task_id", func=target_func))
+ job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
await worker.data_store.add_job(job)
with fail_after(3):
await event.wait()
@@ -66,13 +77,13 @@ class TestAsyncWorker:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
# Then a job was added
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
assert received_event.schedule_id is None
# Then the job was started
@@ -111,10 +122,13 @@ class TestAsyncWorker:
worker = AsyncWorker(data_store)
worker.events.subscribe(listener)
async with worker:
- await worker.data_store.add_task(Task(id='task_id', func=fail_func))
- job = Job(task_id='task_id', schedule_id='foo',
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc))
+ await worker.data_store.add_task(Task(id="task_id", func=fail_func))
+ job = Job(
+ task_id="task_id",
+ schedule_id="foo",
+ scheduled_fire_time=scheduled_start_time,
+ start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc),
+ )
await worker.data_store.add_job(job)
with fail_after(3):
await event.wait()
@@ -126,14 +140,14 @@ class TestAsyncWorker:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
# Then a job was added
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == "task_id"
+ assert received_event.schedule_id == "foo"
# The worker acquired the job
received_event = received_events.pop(0)
@@ -157,7 +171,7 @@ class TestAsyncWorker:
class TestSyncWorker:
- @pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail'])
+ @pytest.mark.parametrize("fail", [False, True], ids=["success", "fail"])
def test_run_job_nonscheduled(self, fail: bool) -> None:
def listener(received_event: Event):
received_events.append(received_event)
@@ -170,8 +184,8 @@ class TestSyncWorker:
worker = Worker(data_store)
worker.events.subscribe(listener)
with worker:
- worker.data_store.add_task(Task(id='task_id', func=sync_func))
- job = Job(task_id='task_id', args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
+ worker.data_store.add_task(Task(id="task_id", func=sync_func))
+ job = Job(task_id="task_id", args=(1, 2), kwargs={"x": "foo", "fail": fail})
worker.data_store.add_job(job)
event.wait(5)
@@ -182,13 +196,13 @@ class TestSyncWorker:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
# Then a job was added
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
assert received_event.schedule_id is None
# Then the job was started
@@ -227,10 +241,13 @@ class TestSyncWorker:
worker = Worker(data_store)
worker.events.subscribe(listener)
with worker:
- worker.data_store.add_task(Task(id='task_id', func=fail_func))
- job = Job(task_id='task_id', schedule_id='foo',
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc))
+ worker.data_store.add_task(Task(id="task_id", func=fail_func))
+ job = Job(
+ task_id="task_id",
+ schedule_id="foo",
+ scheduled_fire_time=scheduled_start_time,
+ start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc),
+ )
worker.data_store.add_job(job)
event.wait(3)
@@ -241,14 +258,14 @@ class TestSyncWorker:
# Then the task was added
received_event = received_events.pop(0)
assert isinstance(received_event, TaskAdded)
- assert received_event.task_id == 'task_id'
+ assert received_event.task_id == "task_id"
# Then a job was added
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
assert received_event.job_id == job.id
- assert received_event.task_id == 'task_id'
- assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == "task_id"
+ assert received_event.schedule_id == "foo"
# The worker acquired the job
received_event = received_events.pop(0)
diff --git a/tests/triggers/test_calendarinterval.py b/tests/triggers/test_calendarinterval.py
index dd45212..4466c34 100644
--- a/tests/triggers/test_calendarinterval.py
+++ b/tests/triggers/test_calendarinterval.py
@@ -9,20 +9,27 @@ from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
def test_bad_interval(timezone):
exc = pytest.raises(ValueError, CalendarIntervalTrigger, timezone=timezone)
- exc.match('interval must be at least 1 day long')
+ exc.match("interval must be at least 1 day long")
def test_bad_start_end_dates(timezone):
- exc = pytest.raises(ValueError, CalendarIntervalTrigger, days=1,
- start_date=date(2016, 3, 4), end_date=date(2016, 3, 3), timezone=timezone)
- exc.match('end_date cannot be earlier than start_date')
+ exc = pytest.raises(
+ ValueError,
+ CalendarIntervalTrigger,
+ days=1,
+ start_date=date(2016, 3, 4),
+ end_date=date(2016, 3, 3),
+ timezone=timezone,
+ )
+ exc.match("end_date cannot be earlier than start_date")
def test_end_date(timezone, serializer):
"""Test that end_date is respected."""
start_end_date = date(2020, 12, 31)
- trigger = CalendarIntervalTrigger(days=1, start_date=start_end_date, end_date=start_end_date,
- timezone=timezone)
+ trigger = CalendarIntervalTrigger(
+ days=1, start_date=start_end_date, end_date=start_end_date, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -36,8 +43,9 @@ def test_missing_time(timezone, serializer):
skipped entirely.
"""
- trigger = CalendarIntervalTrigger(days=1, hour=2, minute=30, start_date=date(2016, 3, 27),
- timezone=timezone)
+ trigger = CalendarIntervalTrigger(
+ days=1, hour=2, minute=30, start_date=date(2016, 3, 27), timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -50,8 +58,9 @@ def test_repeated_time(timezone, serializer):
is executed on the earlier occurrence of that time.
"""
- trigger = CalendarIntervalTrigger(days=2, hour=2, minute=30, start_date=date(2016, 10, 30),
- timezone=timezone)
+ trigger = CalendarIntervalTrigger(
+ days=2, hour=2, minute=30, start_date=date(2016, 10, 30), timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -60,7 +69,9 @@ def test_repeated_time(timezone, serializer):
def test_nonexistent_days(timezone, serializer):
"""Test that invalid dates are skipped."""
- trigger = CalendarIntervalTrigger(months=1, start_date=date(2016, 3, 31), timezone=timezone)
+ trigger = CalendarIntervalTrigger(
+ months=1, start_date=date(2016, 3, 31), timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -70,11 +81,21 @@ def test_nonexistent_days(timezone, serializer):
def test_repr(timezone, serializer):
trigger = CalendarIntervalTrigger(
- years=1, months=5, weeks=6, days=8, hour=3, second=8, start_date=date(2016, 3, 5),
- end_date=date(2020, 12, 25), timezone=timezone)
+ years=1,
+ months=5,
+ weeks=6,
+ days=8,
+ hour=3,
+ second=8,
+ start_date=date(2016, 3, 5),
+ end_date=date(2020, 12, 25),
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
- assert repr(trigger) == ("CalendarIntervalTrigger(years=1, months=5, weeks=6, days=8, "
- "time='03:00:08', start_date='2016-03-05', end_date='2020-12-25', "
- "timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CalendarIntervalTrigger(years=1, months=5, weeks=6, days=8, "
+ "time='03:00:08', start_date='2016-03-05', end_date='2020-12-25', "
+ "timezone='Europe/Berlin')"
+ )
diff --git a/tests/triggers/test_combining.py b/tests/triggers/test_combining.py
index fbdbfd0..9a59ecb 100644
--- a/tests/triggers/test_combining.py
+++ b/tests/triggers/test_combining.py
@@ -11,11 +11,13 @@ from apscheduler.triggers.interval import IntervalTrigger
class TestAndTrigger:
- @pytest.mark.parametrize('threshold', [1, 0])
+ @pytest.mark.parametrize("threshold", [1, 0])
def test_two_datetriggers(self, timezone, serializer, threshold):
date1 = datetime(2020, 5, 16, 14, 17, 30, 254212, tzinfo=timezone)
date2 = datetime(2020, 5, 16, 14, 17, 31, 254212, tzinfo=timezone)
- trigger = AndTrigger([DateTrigger(date1), DateTrigger(date2)], threshold=threshold)
+ trigger = AndTrigger(
+ [DateTrigger(date1), DateTrigger(date2)], threshold=threshold
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -27,10 +29,14 @@ class TestAndTrigger:
def test_max_iterations(self, timezone, serializer):
start_time = datetime(2020, 5, 16, 14, 17, 30, 254212, tzinfo=timezone)
- trigger = AndTrigger([
- IntervalTrigger(seconds=4, start_time=start_time),
- IntervalTrigger(seconds=4, start_time=start_time + timedelta(seconds=2))
- ])
+ trigger = AndTrigger(
+ [
+ IntervalTrigger(seconds=4, start_time=start_time),
+ IntervalTrigger(
+ seconds=4, start_time=start_time + timedelta(seconds=2)
+ ),
+ ]
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -38,10 +44,14 @@ class TestAndTrigger:
def test_repr(self, timezone, serializer):
start_time = datetime(2020, 5, 16, 14, 17, 30, 254212, tzinfo=timezone)
- trigger = AndTrigger([
- IntervalTrigger(seconds=4, start_time=start_time),
- IntervalTrigger(seconds=4, start_time=start_time + timedelta(seconds=2))
- ])
+ trigger = AndTrigger(
+ [
+ IntervalTrigger(seconds=4, start_time=start_time),
+ IntervalTrigger(
+ seconds=4, start_time=start_time + timedelta(seconds=2)
+ ),
+ ]
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -68,10 +78,12 @@ class TestOrTrigger:
start_time = datetime(2020, 5, 16, 14, 17, 30, 254212, tzinfo=timezone)
end_time1 = start_time + timedelta(seconds=16)
end_time2 = start_time + timedelta(seconds=18)
- trigger = OrTrigger([
- IntervalTrigger(seconds=4, start_time=start_time, end_time=end_time1),
- IntervalTrigger(seconds=6, start_time=start_time, end_time=end_time2)
- ])
+ trigger = OrTrigger(
+ [
+ IntervalTrigger(seconds=4, start_time=start_time, end_time=end_time1),
+ IntervalTrigger(seconds=6, start_time=start_time, end_time=end_time2),
+ ]
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -90,5 +102,7 @@ class TestOrTrigger:
date1 = datetime(2020, 5, 16, 14, 17, 30, 254212, tzinfo=timezone)
date2 = datetime(2020, 5, 18, 15, 1, 53, 940564, tzinfo=timezone)
trigger = OrTrigger([DateTrigger(date1), DateTrigger(date2)])
- assert repr(trigger) == ("OrTrigger([DateTrigger('2020-05-16 14:17:30.254212+02:00'), "
- "DateTrigger('2020-05-18 15:01:53.940564+02:00')])")
+ assert repr(trigger) == (
+ "OrTrigger([DateTrigger('2020-05-16 14:17:30.254212+02:00'), "
+ "DateTrigger('2020-05-18 15:01:53.940564+02:00')])"
+ )
diff --git a/tests/triggers/test_cron.py b/tests/triggers/test_cron.py
index 578dd7d..924475a 100644
--- a/tests/triggers/test_cron.py
+++ b/tests/triggers/test_cron.py
@@ -14,55 +14,81 @@ else:
def test_invalid_expression():
- exc = pytest.raises(ValueError, CronTrigger, year='2009-fault')
+ exc = pytest.raises(ValueError, CronTrigger, year="2009-fault")
exc.match("Unrecognized expression '2009-fault' for field 'year'")
def test_invalid_step():
- exc = pytest.raises(ValueError, CronTrigger, year='2009/0')
+ exc = pytest.raises(ValueError, CronTrigger, year="2009/0")
exc.match("Step must be higher than 0")
def test_invalid_range():
- exc = pytest.raises(ValueError, CronTrigger, year='2009-2008')
+ exc = pytest.raises(ValueError, CronTrigger, year="2009-2008")
exc.match("The minimum value in a range must not be higher than the maximum")
-@pytest.mark.parametrize('expr', ['fab', 'jan-fab'], ids=['start', 'end'])
+@pytest.mark.parametrize("expr", ["fab", "jan-fab"], ids=["start", "end"])
def test_invalid_month_name(expr):
exc = pytest.raises(ValueError, CronTrigger, month=expr)
exc.match("Invalid month name 'fab'")
-@pytest.mark.parametrize('expr', ['web', 'mon-web'], ids=['start', 'end'])
+@pytest.mark.parametrize("expr", ["web", "mon-web"], ids=["start", "end"])
def test_invalid_weekday_name(expr):
exc = pytest.raises(ValueError, CronTrigger, day_of_week=expr)
exc.match("Invalid weekday name 'web'")
def test_invalid_weekday_position_name():
- exc = pytest.raises(ValueError, CronTrigger, day='1st web')
+ exc = pytest.raises(ValueError, CronTrigger, day="1st web")
exc.match("Invalid weekday name 'web'")
-@pytest.mark.parametrize('values, expected', [
- (dict(day='*/31'), r"Error validating expression '\*/31': the step value \(31\) is higher "
- r"than the total range of the expression \(30\)"),
- (dict(day='4-6/3'), r"Error validating expression '4-6/3': the step value \(3\) is higher "
- r"than the total range of the expression \(2\)"),
- (dict(hour='0-24'), r"Error validating expression '0-24': the last value \(24\) is higher "
- r"than the maximum value \(23\)"),
- (dict(day='0-3'), r"Error validating expression '0-3': the first value \(0\) is lower "
- r"than the minimum value \(1\)")
-], ids=['too_large_step_all', 'too_large_step_range', 'too_high_last', 'too_low_first'])
+@pytest.mark.parametrize(
+ "values, expected",
+ [
+ (
+ dict(day="*/31"),
+ r"Error validating expression '\*/31': the step value \(31\) is higher "
+ r"than the total range of the expression \(30\)",
+ ),
+ (
+ dict(day="4-6/3"),
+ r"Error validating expression '4-6/3': the step value \(3\) is higher "
+ r"than the total range of the expression \(2\)",
+ ),
+ (
+ dict(hour="0-24"),
+ r"Error validating expression '0-24': the last value \(24\) is higher "
+ r"than the maximum value \(23\)",
+ ),
+ (
+ dict(day="0-3"),
+ r"Error validating expression '0-3': the first value \(0\) is lower "
+ r"than the minimum value \(1\)",
+ ),
+ ],
+ ids=[
+ "too_large_step_all",
+ "too_large_step_range",
+ "too_high_last",
+ "too_low_first",
+ ],
+)
def test_invalid_ranges(values, expected):
pytest.raises(ValueError, CronTrigger, **values).match(expected)
def test_cron_trigger_1(timezone, serializer):
start_time = datetime(2008, 12, 1, tzinfo=timezone)
- trigger = CronTrigger(year='2009/2', month='1-4/3', day='5-6', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year="2009/2",
+ month="1-4/3",
+ day="5-6",
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -71,15 +97,18 @@ def test_cron_trigger_1(timezone, serializer):
assert trigger.next() == datetime(2009, 4, 5, tzinfo=timezone)
assert trigger.next() == datetime(2009, 4, 6, tzinfo=timezone)
assert trigger.next() == datetime(2011, 1, 5, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009/2', month='1-4/3', day='5-6', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2008-12-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009/2', month='1-4/3', day='5-6', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2008-12-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_cron_trigger_2(timezone, serializer):
start_time = datetime(2009, 10, 14, tzinfo=timezone)
- trigger = CronTrigger(year='2009/2', month='1-3', day='5', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year="2009/2", month="1-3", day="5", start_time=start_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -87,57 +116,83 @@ def test_cron_trigger_2(timezone, serializer):
assert trigger.next() == datetime(2011, 2, 5, tzinfo=timezone)
assert trigger.next() == datetime(2011, 3, 5, tzinfo=timezone)
assert trigger.next() == datetime(2013, 1, 5, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009/2', month='1-3', day='5', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2009-10-14T00:00:00+02:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009/2', month='1-3', day='5', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-10-14T00:00:00+02:00', timezone='Europe/Berlin')"
+ )
def test_cron_trigger_3(timezone, serializer):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year='2009', month='feb-dec', hour='8-9', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year="2009",
+ month="feb-dec",
+ hour="8-9",
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2009, 2, 1, 8, tzinfo=timezone)
assert trigger.next() == datetime(2009, 2, 1, 9, tzinfo=timezone)
assert trigger.next() == datetime(2009, 2, 2, 8, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009', month='feb-dec', day='*', week='*', "
- "day_of_week='*', hour='8-9', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='feb-dec', day='*', week='*', "
+ "day_of_week='*', hour='8-9', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_cron_trigger_4(timezone, serializer):
start_time = datetime(2012, 2, 1, tzinfo=timezone)
- trigger = CronTrigger(year='2012', month='2', day='last', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year="2012", month="2", day="last", start_time=start_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2012, 2, 29, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2012', month='2', day='last', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2012-02-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2012', month='2', day='last', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2012-02-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
-@pytest.mark.parametrize('expr', ['3-5', 'wed-fri'], ids=['numeric', 'text'])
+@pytest.mark.parametrize("expr", ["3-5", "wed-fri"], ids=["numeric", "text"])
def test_weekday_overlap(timezone, serializer, expr):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week=expr, start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year=2009,
+ month=1,
+ day="6-10",
+ day_of_week=expr,
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2009, 1, 7, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='6-10', week='*', "
- "day_of_week='wed-fri', hour='0', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='1', day='6-10', week='*', "
+ "day_of_week='wed-fri', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_weekday_range(timezone, serializer):
start_time = datetime(2020, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2020, month=1, week=1, day_of_week='fri-sun', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year=2020,
+ month=1,
+ week=1,
+ day_of_week="fri-sun",
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -145,23 +200,29 @@ def test_weekday_range(timezone, serializer):
assert trigger.next() == datetime(2020, 1, 4, tzinfo=timezone)
assert trigger.next() == datetime(2020, 1, 5, tzinfo=timezone)
assert trigger.next() is None
- assert repr(trigger) == ("CronTrigger(year='2020', month='1', day='*', week='1', "
- "day_of_week='fri-sun', hour='0', minute='0', second='0', "
- "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2020', month='1', day='*', week='1', "
+ "day_of_week='fri-sun', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_last_weekday(timezone, serializer):
start_time = datetime(2020, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2020, day='last sun', start_time=start_time, timezone=timezone)
+ trigger = CronTrigger(
+ year=2020, day="last sun", start_time=start_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2020, 1, 26, tzinfo=timezone)
assert trigger.next() == datetime(2020, 2, 23, tzinfo=timezone)
assert trigger.next() == datetime(2020, 3, 29, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2020', month='*', day='last sun', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2020', month='*', day='last sun', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_increment_weekday(timezone, serializer):
@@ -171,14 +232,16 @@ def test_increment_weekday(timezone, serializer):
"""
start_time = datetime(2009, 9, 25, 7, tzinfo=timezone)
- trigger = CronTrigger(hour='5-6', start_time=start_time, timezone=timezone)
+ trigger = CronTrigger(hour="5-6", start_time=start_time, timezone=timezone)
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2009, 9, 26, 5, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='*', month='*', day='*', week='*', "
- "day_of_week='*', hour='5-6', minute='0', second='0', "
- "start_time='2009-09-25T07:00:00+02:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='*', hour='5-6', minute='0', second='0', "
+ "start_time='2009-09-25T07:00:00+02:00', timezone='Europe/Berlin')"
+ )
def test_month_rollover(timezone, serializer):
@@ -191,52 +254,69 @@ def test_month_rollover(timezone, serializer):
assert trigger.next() == datetime(2016, 4, 30, tzinfo=timezone)
-@pytest.mark.parametrize('weekday', ['1,0', 'mon,sun'], ids=['numeric', 'text'])
+@pytest.mark.parametrize("weekday", ["1,0", "mon,sun"], ids=["numeric", "text"])
def test_weekday_nomatch(timezone, serializer, weekday):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, month=1, day='6-10', day_of_week=weekday,
- start_time=start_time, timezone=timezone)
+ trigger = CronTrigger(
+ year=2009,
+ month=1,
+ day="6-10",
+ day_of_week=weekday,
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() is None
- assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='6-10', week='*', "
- "day_of_week='mon,sun', hour='0', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='1', day='6-10', week='*', "
+ "day_of_week='mon,sun', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_weekday_positional(timezone, serializer):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, month=1, day='4th wed', start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year=2009, month=1, day="4th wed", start_time=start_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2009, 1, 28, tzinfo=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009', month='1', day='4th wed', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='1', day='4th wed', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_end_time(timezone, serializer):
"""Test that next() won't produce"""
start_time = datetime(2014, 4, 13, 2, tzinfo=timezone)
end_time = datetime(2014, 4, 13, 4, tzinfo=timezone)
- trigger = CronTrigger(hour=4, start_time=start_time, end_time=end_time, timezone=timezone)
+ trigger = CronTrigger(
+ hour=4, start_time=start_time, end_time=end_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2014, 4, 13, 4, tzinfo=timezone)
assert trigger.next() is None
- assert repr(trigger) == ("CronTrigger(year='*', month='*', day='*', week='*', "
- "day_of_week='*', hour='4', minute='0', second='0', "
- "start_time='2014-04-13T02:00:00+02:00', "
- "end_time='2014-04-13T04:00:00+02:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='*', hour='4', minute='0', second='0', "
+ "start_time='2014-04-13T02:00:00+02:00', "
+ "end_time='2014-04-13T04:00:00+02:00', timezone='Europe/Berlin')"
+ )
def test_week_1(timezone, serializer):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, month=2, week=8, start_time=start_time, timezone=timezone)
+ trigger = CronTrigger(
+ year=2009, month=2, week=8, start_time=start_time, timezone=timezone
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -244,100 +324,163 @@ def test_week_1(timezone, serializer):
assert trigger.next() == datetime(2009, 2, day, tzinfo=timezone)
assert trigger.next() is None
- assert repr(trigger) == ("CronTrigger(year='2009', month='2', day='*', week='8', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='2', day='*', week='8', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
-@pytest.mark.parametrize('weekday', [3, 'wed'], ids=['numeric', 'text'])
+@pytest.mark.parametrize("weekday", [3, "wed"], ids=["numeric", "text"])
def test_week_2(timezone, serializer, weekday):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, week=15, day_of_week=weekday, start_time=start_time,
- timezone=timezone)
+ trigger = CronTrigger(
+ year=2009,
+ week=15,
+ day_of_week=weekday,
+ start_time=start_time,
+ timezone=timezone,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
assert trigger.next() == datetime(2009, 4, 8, tzinfo=timezone)
assert trigger.next() is None
- assert repr(trigger) == ("CronTrigger(year='2009', month='*', day='*', week='15', "
- "day_of_week='wed', hour='0', minute='0', second='0', "
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='*', day='*', week='15', "
+ "day_of_week='wed', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
@pytest.mark.parametrize(
- 'trigger_args, start_time, start_time_fold, correct_next_date,' 'correct_next_date_fold',
+ "trigger_args, start_time, start_time_fold, correct_next_date,"
+ "correct_next_date_fold",
[
- ({'hour': 8}, datetime(2013, 3, 9, 12), 0, datetime(2013, 3, 10, 8), 0),
- ({'hour': 8}, datetime(2013, 11, 2, 12), 0, datetime(2013, 11, 3, 8), 0),
- ({'minute': '*/30'}, datetime(2013, 3, 10, 1, 35), 0, datetime(2013, 3, 10, 3), 0),
- ({'minute': '*/30'}, datetime(2013, 11, 3, 1, 35), 0, datetime(2013, 11, 3, 1), 1)
- ], ids=['absolute_spring', 'absolute_autumn', 'interval_spring', 'interval_autumn'])
-def test_dst_change(trigger_args, start_time, start_time_fold, correct_next_date,
- correct_next_date_fold, serializer):
+ ({"hour": 8}, datetime(2013, 3, 9, 12), 0, datetime(2013, 3, 10, 8), 0),
+ ({"hour": 8}, datetime(2013, 11, 2, 12), 0, datetime(2013, 11, 3, 8), 0),
+ (
+ {"minute": "*/30"},
+ datetime(2013, 3, 10, 1, 35),
+ 0,
+ datetime(2013, 3, 10, 3),
+ 0,
+ ),
+ (
+ {"minute": "*/30"},
+ datetime(2013, 11, 3, 1, 35),
+ 0,
+ datetime(2013, 11, 3, 1),
+ 1,
+ ),
+ ],
+ ids=["absolute_spring", "absolute_autumn", "interval_spring", "interval_autumn"],
+)
+def test_dst_change(
+ trigger_args,
+ start_time,
+ start_time_fold,
+ correct_next_date,
+ correct_next_date_fold,
+ serializer,
+):
"""
Making sure that CronTrigger works correctly when crossing the DST switch threshold.
Note that you should explicitly compare datetimes as strings to avoid the internal datetime
comparison which would test for equality in the UTC timezone.
"""
- timezone = ZoneInfo('US/Eastern')
+ timezone = ZoneInfo("US/Eastern")
start_time = start_time.replace(tzinfo=timezone, fold=start_time_fold)
trigger = CronTrigger(timezone=timezone, start_time=start_time, **trigger_args)
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
- assert trigger.next() == correct_next_date.replace(tzinfo=timezone,
- fold=correct_next_date_fold)
+ assert trigger.next() == correct_next_date.replace(
+ tzinfo=timezone, fold=correct_next_date_fold
+ )
def test_zero_value(timezone):
start_time = datetime(2020, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year=2009, month=2, hour=0, start_time=start_time, timezone=timezone)
- assert repr(trigger) == ("CronTrigger(year='2009', month='2', day='*', week='*', "
- "day_of_week='*', hour='0', minute='0', second='0', "
- "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')")
+ trigger = CronTrigger(
+ year=2009, month=2, hour=0, start_time=start_time, timezone=timezone
+ )
+ assert repr(trigger) == (
+ "CronTrigger(year='2009', month='2', day='*', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2020-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
def test_year_list(timezone, serializer):
start_time = datetime(2009, 1, 1, tzinfo=timezone)
- trigger = CronTrigger(year='2009,2008', start_time=start_time, timezone=timezone)
- assert repr(trigger) == "CronTrigger(year='2009,2008', month='1', day='1', week='*', " \
- "day_of_week='*', hour='0', minute='0', second='0', " \
- "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ trigger = CronTrigger(year="2009,2008", start_time=start_time, timezone=timezone)
+ assert (
+ repr(trigger) == "CronTrigger(year='2009,2008', month='1', day='1', week='*', "
+ "day_of_week='*', hour='0', minute='0', second='0', "
+ "start_time='2009-01-01T00:00:00+01:00', timezone='Europe/Berlin')"
+ )
assert trigger.next() == datetime(2009, 1, 1, tzinfo=timezone)
assert trigger.next() is None
-@pytest.mark.parametrize('expr, expected_repr', [
- ('* * * * *',
- "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='*', hour='*', minute='*', "
- "second='0', start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')"),
- ('0-14 * 14-28 jul fri',
- "CronTrigger(year='*', month='jul', day='14-28', week='*', day_of_week='fri', hour='*', "
- "minute='0-14', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
- (' 0-14 * 14-28 jul fri',
- "CronTrigger(year='*', month='jul', day='14-28', week='*', day_of_week='fri', hour='*', "
- "minute='0-14', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
- ('* * * * 1-5',
- "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='mon-fri', hour='*', "
- "minute='*', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
- ('* * * * 0-3',
- "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='mon-wed,sun', hour='*', "
- "minute='*', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
- ('* * * * 6-1',
- "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='mon,sat-sun', hour='*', "
- "minute='*', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
- ('* * * * 6-7',
- "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='sat-sun', hour='*', "
- "minute='*', second='0', start_time='2020-05-19T19:53:22+02:00', "
- "timezone='Europe/Berlin')"),
-], ids=['always', 'assorted', 'multiple_spaces_in_format', 'working_week', 'sunday_first',
- 'saturday_first', 'weekend'])
+@pytest.mark.parametrize(
+ "expr, expected_repr",
+ [
+ (
+ "* * * * *",
+ "CronTrigger(year='*', month='*', day='*', week='*', day_of_week='*',"
+ "hour='*', minute='*', second='0', start_time='2020-05-19T19:53:22+02:00', "
+ "timezone='Europe/Berlin')",
+ ),
+ (
+ "0-14 * 14-28 jul fri",
+ "CronTrigger(year='*', month='jul', day='14-28', week='*', "
+ "day_of_week='fri', hour='*', minute='0-14', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ (
+ " 0-14 * 14-28 jul fri",
+ "CronTrigger(year='*', month='jul', day='14-28', week='*', "
+ "day_of_week='fri', hour='*', minute='0-14', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ (
+ "* * * * 1-5",
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='mon-fri', hour='*', minute='*', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ (
+ "* * * * 0-3",
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='mon-wed,sun', hour='*', minute='*', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ (
+ "* * * * 6-1",
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='mon,sat-sun', hour='*', minute='*', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ (
+ "* * * * 6-7",
+ "CronTrigger(year='*', month='*', day='*', week='*', "
+ "day_of_week='sat-sun', hour='*', minute='*', second='0', "
+ "start_time='2020-05-19T19:53:22+02:00', timezone='Europe/Berlin')",
+ ),
+ ],
+ ids=[
+ "always",
+ "assorted",
+ "multiple_spaces_in_format",
+ "working_week",
+ "sunday_first",
+ "saturday_first",
+ "weekend",
+ ],
+)
def test_from_crontab(expr, expected_repr, timezone, serializer):
trigger = CronTrigger.from_crontab(expr, timezone)
trigger.start_time = datetime(2020, 5, 19, 19, 53, 22, tzinfo=timezone)
@@ -348,5 +491,5 @@ def test_from_crontab(expr, expected_repr, timezone, serializer):
def test_from_crontab_wrong_number_of_fields():
- exc = pytest.raises(ValueError, CronTrigger.from_crontab, '*')
- exc.match('Wrong number of fields; got 1, expected 5')
+ exc = pytest.raises(ValueError, CronTrigger.from_crontab, "*")
+ exc.match("Wrong number of fields; got 1, expected 5")
diff --git a/tests/triggers/test_interval.py b/tests/triggers/test_interval.py
index 1778f61..04056fd 100644
--- a/tests/triggers/test_interval.py
+++ b/tests/triggers/test_interval.py
@@ -9,22 +9,25 @@ from apscheduler.triggers.interval import IntervalTrigger
def test_bad_interval():
exc = pytest.raises(ValueError, IntervalTrigger)
- exc.match('The time interval must be positive')
+ exc.match("The time interval must be positive")
def test_bad_end_time(timezone):
start_time = datetime(2020, 5, 16, tzinfo=timezone)
end_time = datetime(2020, 5, 15, tzinfo=timezone)
- exc = pytest.raises(ValueError, IntervalTrigger, seconds=1, start_time=start_time,
- end_time=end_time)
- exc.match('end_time cannot be earlier than start_time')
+ exc = pytest.raises(
+ ValueError, IntervalTrigger, seconds=1, start_time=start_time, end_time=end_time
+ )
+ exc.match("end_time cannot be earlier than start_time")
def test_end_time(timezone, serializer):
start_time = datetime(2020, 5, 16, 19, 32, 44, 649521, tzinfo=timezone)
end_time = datetime(2020, 5, 16, 22, 33, 1, tzinfo=timezone)
interval = timedelta(hours=1, seconds=6)
- trigger = IntervalTrigger(start_time=start_time, end_time=end_time, hours=1, seconds=6)
+ trigger = IntervalTrigger(
+ start_time=start_time, end_time=end_time, hours=1, seconds=6
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
@@ -37,11 +40,21 @@ def test_end_time(timezone, serializer):
def test_repr(timezone, serializer):
start_time = datetime(2020, 5, 15, 12, 55, 32, 954032, tzinfo=timezone)
end_time = datetime(2020, 6, 4, 16, 18, 49, 306942, tzinfo=timezone)
- trigger = IntervalTrigger(weeks=1, days=2, hours=3, minutes=4, seconds=5, microseconds=123525,
- start_time=start_time, end_time=end_time)
+ trigger = IntervalTrigger(
+ weeks=1,
+ days=2,
+ hours=3,
+ minutes=4,
+ seconds=5,
+ microseconds=123525,
+ start_time=start_time,
+ end_time=end_time,
+ )
if serializer:
trigger = serializer.deserialize(serializer.serialize(trigger))
- assert repr(trigger) == ("IntervalTrigger(weeks=1, days=2, hours=3, minutes=4, seconds=5, "
- "microseconds=123525, start_time='2020-05-15 12:55:32.954032+02:00', "
- "end_time='2020-06-04 16:18:49.306942+02:00')")
+ assert repr(trigger) == (
+ "IntervalTrigger(weeks=1, days=2, hours=3, minutes=4, seconds=5, "
+ "microseconds=123525, start_time='2020-05-15 12:55:32.954032+02:00', "
+ "end_time='2020-06-04 16:18:49.306942+02:00')"
+ )