diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-12 22:09:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | c5727432736b55b7d76753307f14efdb962c2edf (patch) | |
tree | 005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/eventbrokers/local.py | |
parent | 26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff) | |
download | apscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz |
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler
- Removed workers as a user interface
- Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface
- Added the current_async_scheduler contextvar
- Added job executors
Diffstat (limited to 'src/apscheduler/eventbrokers/local.py')
-rw-r--r-- | src/apscheduler/eventbrokers/local.py | 72 |
1 files changed, 4 insertions, 68 deletions
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py index 25ff2dd..27a3cfd 100644 --- a/src/apscheduler/eventbrokers/local.py +++ b/src/apscheduler/eventbrokers/local.py @@ -1,22 +1,15 @@ from __future__ import annotations -from asyncio import iscoroutinefunction -from concurrent.futures import ThreadPoolExecutor -from contextlib import ExitStack -from threading import Lock -from typing import Any, Callable, Iterable - import attrs from .._events import Event -from ..abc import EventBroker, Subscription from .base import BaseEventBroker @attrs.define(eq=False) -class LocalEventBroker(EventBroker, BaseEventBroker): +class LocalEventBroker(BaseEventBroker): """ - Synchronous, local event broker. + Asynchronous, local event broker. This event broker only broadcasts within the process it runs in, and is therefore not suitable for multi-node or multiprocess use cases. @@ -24,62 +17,5 @@ class LocalEventBroker(EventBroker, BaseEventBroker): Does not serialize events. """ - _executor: ThreadPoolExecutor = attrs.field(init=False) - _exit_stack: ExitStack = attrs.field(init=False) - _subscriptions_lock: Lock = attrs.field(init=False, factory=Lock) - - def start(self) -> None: - self._executor = ThreadPoolExecutor(1) - - def stop(self, *, force: bool = False) -> None: - self._executor.shutdown(wait=not force) - del self._executor - - def subscribe( - self, - callback: Callable[[Event], Any], - event_types: Iterable[type[Event]] | None = None, - *, - one_shot: bool = False, - ) -> Subscription: - if iscoroutinefunction(callback): - raise ValueError( - "Coroutine functions are not supported as callbacks on a synchronous " - "event source" - ) - - with self._subscriptions_lock: - return super().subscribe(callback, event_types, one_shot=one_shot) - - def unsubscribe(self, token: object) -> None: - with self._subscriptions_lock: - super().unsubscribe(token) - - def publish(self, event: Event) -> None: - self.publish_local(event) - - def publish_local(self, event: Event) -> None: - event_type = type(event) - with self._subscriptions_lock: - one_shot_tokens: list[object] = [] - for _token, subscription in self._subscriptions.items(): - if ( - subscription.event_types is None - or event_type in subscription.event_types - ): - self._executor.submit( - self._deliver_event, subscription.callback, event - ) - if subscription.one_shot: - one_shot_tokens.append(subscription.token) - - for token in one_shot_tokens: - super().unsubscribe(token) - - def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None: - try: - func(event) - except BaseException: - self._logger.exception( - "Error delivering %s event", event.__class__.__name__ - ) + async def publish(self, event: Event) -> None: + await self.publish_local(event) |