summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-03-20 17:17:49 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-07-19 00:48:06 +0300
commitb4715e05b9c41be8684260b5bde404c6a8cfb909 (patch)
tree996772c133297e941f3306ac27d7e0f53c0c8130 /src/apscheduler/schedulers
parent42b55a103410e8bd13b14c68fd700724b03d50e8 (diff)
downloadapscheduler-b4715e05b9c41be8684260b5bde404c6a8cfb909.tar.gz
Wait until the scheduler has stopped before returning from stop()
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/sync.py21
1 files changed, 12 insertions, 9 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index bc2776e..4c7095e 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -248,8 +248,11 @@ class Scheduler:
def stop(self) -> None:
if self._state is RunState.started:
- self._state = RunState.stopping
- self._wakeup_event.set()
+ f = Future()
+ with self._events.subscribe(f.set_result, one_shot=True):
+ self._state = RunState.stopping
+ self._wakeup_event.set()
+ f.result()
def run_until_stopped(self):
self._state = RunState.starting
@@ -259,7 +262,9 @@ class Scheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
exit_stack.enter_context(self.data_store)
- exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the scheduler if the data store emits a significant schedule event
exit_stack.enter_context(
@@ -281,7 +286,9 @@ class Scheduler:
start_future: Future[Event] = Future()
with self._events.subscribe(start_future.set_result, one_shot=True):
executor = ThreadPoolExecutor(1)
- exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None))
+ exit_stack.push(
+ lambda exc_type, *args: executor.shutdown(wait=exc_type is None)
+ )
run_future = executor.submit(self._run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
@@ -292,11 +299,7 @@ class Scheduler:
run_future.result()
def _run(self) -> None:
- if self._state is not RunState.starting:
- raise RuntimeError(
- f"This function cannot be called while the scheduler is in the "
- f"{self._state} state"
- )
+ assert self._state is RunState.starting
# Signal that the scheduler has started
self._state = RunState.started