summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/sync.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 01:29:43 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 01:29:43 +0300
commit5f0627be932d561c92bedc684ab1b8c5646520d4 (patch)
tree078ae657f60dd1bbcdbbdf300d1c7942e958e64b /src/apscheduler/schedulers/sync.py
parent90a9675f4444809c72c25a450a40243be8110b68 (diff)
downloadapscheduler-5f0627be932d561c92bedc684ab1b8c5646520d4.tar.gz
Changed the scheduler API to require an explicit start call
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r--src/apscheduler/schedulers/sync.py35
1 files changed, 24 insertions, 11 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index c821a6a..d98161a 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -4,6 +4,7 @@ import atexit
import os
import platform
import random
+import sys
import threading
from concurrent.futures import Future
from contextlib import ExitStack
@@ -38,6 +39,11 @@ from ..eventbrokers.local import LocalEventBroker
from ..marshalling import callable_to_ref
from ..workers.sync import Worker
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
_microsecond_delta = timedelta(microseconds=1)
_zero_timedelta = timedelta()
@@ -57,15 +63,16 @@ class Scheduler:
_wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_services_initialized: bool = attrs.field(init=False, default=False)
- _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack)
+ _exit_stack: ExitStack | None = attrs.field(init=False, default=None)
_lock: threading.RLock = attrs.field(init=False, factory=threading.RLock)
def __attrs_post_init__(self) -> None:
if not self.identity:
self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
- def __enter__(self) -> Scheduler:
- self.start_in_background()
+ def __enter__(self: Self) -> Self:
+ self._exit_stack = ExitStack()
+ self._ensure_services_ready(self._exit_stack)
return self
def __exit__(
@@ -75,16 +82,24 @@ class Scheduler:
exc_tb: TracebackType,
) -> None:
self.stop()
- self._join_thread()
+ self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None:
"""Ensure that the data store and event broker have been initialized."""
- stack = exit_stack or self._exit_stack
with self._lock:
if not self._services_initialized:
+ if exit_stack is None:
+ if self._exit_stack is None:
+ self._exit_stack = exit_stack = ExitStack()
+ atexit.register(self._exit_stack.close)
+ else:
+ exit_stack = self._exit_stack
+
self._services_initialized = True
+ exit_stack.callback(setattr, self, "_services_initialized", False)
+
self.event_broker.start()
- stack.push(
+ exit_stack.push(
lambda *exc_info: self.event_broker.stop(
force=exc_info[0] is not None
)
@@ -92,13 +107,11 @@ class Scheduler:
# Initialize the data store
self.data_store.start(self.event_broker)
- stack.push(
+ exit_stack.push(
lambda *exc_info: self.data_store.stop(
force=exc_info[0] is not None
)
)
- if not exit_stack:
- atexit.register(self._exit_stack.close)
def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
@@ -377,8 +390,8 @@ class Scheduler:
self._thread = None
raise
- atexit.register(self._join_thread)
- atexit.register(self.stop)
+ self._exit_stack.callback(self._join_thread)
+ self._exit_stack.callback(self.stop)
def stop(self) -> None:
"""