diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-12 22:09:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | c5727432736b55b7d76753307f14efdb962c2edf (patch) | |
tree | 005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/executors/subprocess.py | |
parent | 26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff) | |
download | apscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz |
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler
- Removed workers as a user interface
- Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface
- Added the current_async_scheduler contextvar
- Added job executors
Diffstat (limited to 'src/apscheduler/executors/subprocess.py')
-rw-r--r-- | src/apscheduler/executors/subprocess.py | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/src/apscheduler/executors/subprocess.py b/src/apscheduler/executors/subprocess.py new file mode 100644 index 0000000..e766e71 --- /dev/null +++ b/src/apscheduler/executors/subprocess.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from collections.abc import Callable +from contextlib import AsyncExitStack +from functools import partial +from typing import Any + +import attrs +from anyio import CapacityLimiter, to_process + +from .._structures import Job +from ..abc import JobExecutor + + +@attrs.define(eq=False, kw_only=True) +class ProcessPoolJobExecutor(JobExecutor): + """ + Executes functions in a process pool. + + :param max_workers: the maximum number of worker processes to keep + """ + + max_workers: int = 40 + _limiter: CapacityLimiter = attrs.field(init=False) + + async def start(self, exit_stack: AsyncExitStack) -> None: + self._limiter = CapacityLimiter(self.max_workers) + + async def run_job(self, func: Callable[..., Any], job: Job) -> Any: + wrapped = partial(func, *job.args, **job.kwargs) + return await to_process.run_sync( + wrapped, cancellable=True, limiter=self._limiter + ) |