diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 01:29:43 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 01:29:43 +0300 |
commit | 5f0627be932d561c92bedc684ab1b8c5646520d4 (patch) | |
tree | 078ae657f60dd1bbcdbbdf300d1c7942e958e64b /src/apscheduler/schedulers/sync.py | |
parent | 90a9675f4444809c72c25a450a40243be8110b68 (diff) | |
download | apscheduler-5f0627be932d561c92bedc684ab1b8c5646520d4.tar.gz |
Changed the scheduler API to require an explicit start call
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 35 |
1 files changed, 24 insertions, 11 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index c821a6a..d98161a 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -4,6 +4,7 @@ import atexit import os import platform import random +import sys import threading from concurrent.futures import Future from contextlib import ExitStack @@ -38,6 +39,11 @@ from ..eventbrokers.local import LocalEventBroker from ..marshalling import callable_to_ref from ..workers.sync import Worker +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() @@ -57,15 +63,16 @@ class Scheduler: _wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _services_initialized: bool = attrs.field(init=False, default=False) - _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack) + _exit_stack: ExitStack | None = attrs.field(init=False, default=None) _lock: threading.RLock = attrs.field(init=False, factory=threading.RLock) def __attrs_post_init__(self) -> None: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" - def __enter__(self) -> Scheduler: - self.start_in_background() + def __enter__(self: Self) -> Self: + self._exit_stack = ExitStack() + self._ensure_services_ready(self._exit_stack) return self def __exit__( @@ -75,16 +82,24 @@ class Scheduler: exc_tb: TracebackType, ) -> None: self.stop() - self._join_thread() + self._exit_stack.__exit__(exc_type, exc_val, exc_tb) def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None: """Ensure that the data store and event broker have been initialized.""" - stack = exit_stack or self._exit_stack with self._lock: if not self._services_initialized: + if exit_stack is None: + if self._exit_stack is None: + self._exit_stack = exit_stack = ExitStack() + atexit.register(self._exit_stack.close) + else: + exit_stack = self._exit_stack + self._services_initialized = True + exit_stack.callback(setattr, self, "_services_initialized", False) + self.event_broker.start() - stack.push( + exit_stack.push( lambda *exc_info: self.event_broker.stop( force=exc_info[0] is not None ) @@ -92,13 +107,11 @@ class Scheduler: # Initialize the data store self.data_store.start(self.event_broker) - stack.push( + exit_stack.push( lambda *exc_info: self.data_store.stop( force=exc_info[0] is not None ) ) - if not exit_stack: - atexit.register(self._exit_stack.close) def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) @@ -377,8 +390,8 @@ class Scheduler: self._thread = None raise - atexit.register(self._join_thread) - atexit.register(self.stop) + self._exit_stack.callback(self._join_thread) + self._exit_stack.callback(self.stop) def stop(self) -> None: """ |