summaryrefslogtreecommitdiff
path: root/src/apscheduler/workers/async_.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/workers/async_.py')
-rw-r--r--src/apscheduler/workers/async_.py251
1 files changed, 0 insertions, 251 deletions
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
deleted file mode 100644
index 4ef1bff..0000000
--- a/src/apscheduler/workers/async_.py
+++ /dev/null
@@ -1,251 +0,0 @@
-from __future__ import annotations
-
-import os
-import platform
-from asyncio import CancelledError
-from contextlib import AsyncExitStack
-from datetime import datetime, timezone
-from inspect import isawaitable
-from logging import Logger, getLogger
-from types import TracebackType
-from typing import Callable
-from uuid import UUID
-
-import anyio
-import attrs
-from anyio import (
- TASK_STATUS_IGNORED,
- create_task_group,
- get_cancelled_exc_class,
- move_on_after,
-)
-from anyio.abc import CancelScope, TaskGroup
-
-from .._context import current_job, current_worker
-from .._converters import as_async_datastore, as_async_eventbroker
-from .._enums import JobOutcome, RunState
-from .._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped
-from .._structures import Job, JobInfo, JobResult
-from .._validators import positive_integer
-from ..abc import AsyncDataStore, AsyncEventBroker
-from ..eventbrokers.async_local import LocalAsyncEventBroker
-
-
-@attrs.define(eq=False)
-class AsyncWorker:
- """Runs jobs locally in a task group."""
-
- data_store: AsyncDataStore = attrs.field(converter=as_async_datastore)
- event_broker: AsyncEventBroker = attrs.field(
- converter=as_async_eventbroker, factory=LocalAsyncEventBroker
- )
- max_concurrent_jobs: int = attrs.field(
- kw_only=True, validator=positive_integer, default=100
- )
- identity: str = attrs.field(kw_only=True, default=None)
- logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
- # True if a scheduler owns this worker
- _is_internal: bool = attrs.field(kw_only=True, default=False)
-
- _state: RunState = attrs.field(init=False, default=RunState.stopped)
- _wakeup_event: anyio.Event = attrs.field(init=False)
- _task_group: TaskGroup = attrs.field(init=False)
- _acquired_jobs: set[Job] = attrs.field(init=False, factory=set)
- _running_jobs: set[UUID] = attrs.field(init=False, factory=set)
-
- def __attrs_post_init__(self) -> None:
- if not self.identity:
- self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
-
- async def __aenter__(self) -> AsyncWorker:
- self._task_group = create_task_group()
- await self._task_group.__aenter__()
- await self._task_group.start(self.run_until_stopped)
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException],
- exc_val: BaseException,
- exc_tb: TracebackType,
- ) -> None:
- self._state = RunState.stopping
- self._wakeup_event.set()
- await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
- del self._task_group
- del self._wakeup_event
-
- @property
- def state(self) -> RunState:
- """The current running state of the worker."""
- return self._state
-
- async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None:
- """Run the worker until it is explicitly stopped."""
- if self._state is not RunState.stopped:
- raise RuntimeError(
- f'Cannot start the worker when it is in the "{self._state}" ' f"state"
- )
-
- self._state = RunState.starting
- self._wakeup_event = anyio.Event()
- async with AsyncExitStack() as exit_stack:
- if not self._is_internal:
- # Initialize the event broker
- await self.event_broker.start()
- exit_stack.push_async_exit(
- lambda *exc_info: self.event_broker.stop(
- force=exc_info[0] is not None
- )
- )
-
- # Initialize the data store
- await self.data_store.start(self.event_broker)
- exit_stack.push_async_exit(
- lambda *exc_info: self.data_store.stop(
- force=exc_info[0] is not None
- )
- )
-
- # Set the current worker
- token = current_worker.set(self)
- exit_stack.callback(current_worker.reset, token)
-
- # Wake up the worker if the data store emits a significant job event
- self.event_broker.subscribe(
- lambda event: self._wakeup_event.set(), {JobAdded}
- )
-
- # Signal that the worker has started
- self._state = RunState.started
- task_status.started()
- exception: BaseException | None = None
- try:
- await self.event_broker.publish_local(WorkerStarted())
-
- async with create_task_group() as tg:
- while self._state is RunState.started:
- limit = self.max_concurrent_jobs - len(self._running_jobs)
- jobs = await self.data_store.acquire_jobs(self.identity, limit)
- for job in jobs:
- task = await self.data_store.get_task(job.task_id)
- self._running_jobs.add(job.id)
- tg.start_soon(self._run_job, job, task.func)
-
- await self._wakeup_event.wait()
- self._wakeup_event = anyio.Event()
- except get_cancelled_exc_class():
- pass
- except BaseException as exc:
- exception = exc
- raise
- finally:
- self._state = RunState.stopped
-
- # CancelledError is a subclass of Exception in Python 3.7
- if not exception or isinstance(exception, CancelledError):
- self.logger.info("Worker stopped")
- elif isinstance(exception, Exception):
- self.logger.exception("Worker crashed")
- elif exception:
- self.logger.info(
- f"Worker stopped due to {exception.__class__.__name__}"
- )
-
- with move_on_after(3, shield=True):
- await self.event_broker.publish_local(
- WorkerStopped(exception=exception)
- )
-
- async def stop(self, *, force: bool = False) -> None:
- """
- Signal the worker that it should stop running jobs.
-
- This method does not wait for the worker to actually stop.
-
- """
- if self._state in (RunState.starting, RunState.started):
- self._state = RunState.stopping
- event = anyio.Event()
- self.event_broker.subscribe(
- lambda ev: event.set(), {WorkerStopped}, one_shot=True
- )
- if force:
- self._task_group.cancel_scope.cancel()
- else:
- self._wakeup_event.set()
-
- await event.wait()
-
- async def _run_job(self, job: Job, func: Callable) -> None:
- try:
- # Check if the job started before the deadline
- start_time = datetime.now(timezone.utc)
- if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult.from_job(
- job,
- outcome=JobOutcome.missed_start_deadline,
- finished_at=start_time,
- )
- await self.data_store.release_job(self.identity, job.task_id, result)
- await self.event_broker.publish(
- JobReleased.from_result(result, self.identity)
- )
- return
-
- token = current_job.set(JobInfo.from_job(job))
- try:
- retval = func(*job.args, **job.kwargs)
- if isawaitable(retval):
- retval = await retval
- except get_cancelled_exc_class():
- self.logger.info("Job %s was cancelled", job.id)
- with CancelScope(shield=True):
- result = JobResult.from_job(
- job,
- outcome=JobOutcome.cancelled,
- )
- await self.data_store.release_job(
- self.identity, job.task_id, result
- )
- await self.event_broker.publish(
- JobReleased.from_result(result, self.identity)
- )
- except BaseException as exc:
- if isinstance(exc, Exception):
- self.logger.exception("Job %s raised an exception", job.id)
- else:
- self.logger.error(
- "Job %s was aborted due to %s", job.id, exc.__class__.__name__
- )
-
- result = JobResult.from_job(
- job,
- JobOutcome.error,
- exception=exc,
- )
- await self.data_store.release_job(
- self.identity,
- job.task_id,
- result,
- )
- await self.event_broker.publish(
- JobReleased.from_result(result, self.identity)
- )
- if not isinstance(exc, Exception):
- raise
- else:
- self.logger.info("Job %s completed successfully", job.id)
- result = JobResult.from_job(
- job,
- JobOutcome.success,
- return_value=retval,
- )
- await self.data_store.release_job(self.identity, job.task_id, result)
- await self.event_broker.publish(
- JobReleased.from_result(result, self.identity)
- )
- finally:
- current_job.reset(token)
- finally:
- self._running_jobs.remove(job.id)