diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-12 22:09:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | c5727432736b55b7d76753307f14efdb962c2edf (patch) | |
tree | 005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/eventbrokers/asyncpg.py | |
parent | 26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff) | |
download | apscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz |
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler
- Removed workers as a user interface
- Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface
- Added the current_async_scheduler contextvar
- Added job executors
Diffstat (limited to 'src/apscheduler/eventbrokers/asyncpg.py')
-rw-r--r-- | src/apscheduler/eventbrokers/asyncpg.py | 52 |
1 files changed, 11 insertions, 41 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 7e3045e..59f7292 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -5,10 +5,8 @@ from contextlib import AsyncExitStack from functools import partial from typing import TYPE_CHECKING, Any, Callable, cast -import anyio import asyncpg import attrs -import tenacity from anyio import ( TASK_STATUS_IGNORED, EndOfStream, @@ -18,20 +16,16 @@ from anyio import ( from anyio.streams.memory import MemoryObjectSendStream from asyncpg import Connection, InterfaceError -from .. import RetrySettings from .._events import Event from .._exceptions import SerializationError -from ..abc import Serializer -from ..serializers.json import JSONSerializer -from .async_local import LocalAsyncEventBroker -from .base import DistributedEventBrokerMixin +from .base import BaseExternalEventBroker if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncEngine @attrs.define(eq=False) -class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): +class AsyncpgEventBroker(BaseExternalEventBroker): """ An asynchronous, asyncpg_ based event broker that uses a PostgreSQL server to broadcast events using its ``NOTIFY`` mechanism. @@ -39,16 +33,13 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): .. _asyncpg: https://pypi.org/project/asyncpg/ :param connection_factory: a callable that creates an asyncpg connection - :param serializer: the serializer used to (de)serialize events for transport :param channel: the ``NOTIFY`` channel to use :param max_idle_time: maximum time to let the connection go idle, before sending a ``SELECT 1`` query to prevent a connection timeout """ connection_factory: Callable[[], Awaitable[Connection]] - serializer: Serializer = attrs.field(kw_only=True, factory=JSONSerializer) channel: str = attrs.field(kw_only=True, default="apscheduler") - retry_settings: RetrySettings = attrs.field(default=RetrySettings()) max_idle_time: float = attrs.field(kw_only=True, default=10) _send: MemoryObjectSendStream[str] = attrs.field(init=False) @@ -111,38 +102,17 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): factory = partial(asyncpg.connect, **connect_args) return cls(factory, **kwargs) - def _retry(self) -> tenacity.AsyncRetrying: - def after_attempt(retry_state: tenacity.RetryCallState) -> None: - self._logger.warning( - f"{self.__class__.__name__}: connection failure " - f"(attempt {retry_state.attempt_number}): " - f"{retry_state.outcome.exception()}", - ) + @property + def _temporary_failure_exceptions(self) -> tuple[type[Exception]]: + return OSError, InterfaceError - return tenacity.AsyncRetrying( - stop=self.retry_settings.stop, - wait=self.retry_settings.wait, - retry=tenacity.retry_if_exception_type((OSError, InterfaceError)), - after=after_attempt, - sleep=anyio.sleep, - reraise=True, + async def start(self, exit_stack: AsyncExitStack) -> None: + await super().start(exit_stack) + self._send = cast( + MemoryObjectSendStream[str], + await self._task_group.start(self._listen_notifications), ) - - async def start(self) -> None: - await super().start() - try: - self._send = cast( - MemoryObjectSendStream[str], - await self._task_group.start(self._listen_notifications), - ) - except BaseException: - await super().stop(force=True) - raise - - async def stop(self, *, force: bool = False) -> None: - self._send.close() - await super().stop(force=force) - self._logger.info("Stopped event broker") + await exit_stack.enter_async_context(self._send) async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None: conn: Connection |