diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-01-17 18:08:52 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-01-17 18:08:52 +0200 |
commit | 616b2e60a03984e059bcbfc8200a9f4a9ce75457 (patch) | |
tree | f8be6f3c50bee22995ad29a42f0bcf23b70840fb | |
parent | a114d56baa9acbe4f4113a4eef7040619bd2e4be (diff) | |
download | apscheduler-616b2e60a03984e059bcbfc8200a9f4a9ce75457.tar.gz |
Fixed broken process pool executor issue
Fixes #362.
-rw-r--r-- | apscheduler/executors/pool.py | 9 | ||||
-rw-r--r-- | docs/versionhistory.rst | 2 | ||||
-rw-r--r-- | tests/test_executors.py | 34 |
3 files changed, 43 insertions, 2 deletions
diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py index 2f4ef45..302d4bd 100644 --- a/apscheduler/executors/pool.py +++ b/apscheduler/executors/pool.py @@ -1,4 +1,5 @@ from abc import abstractmethod +from concurrent.futures.process import BrokenProcessPool import concurrent.futures from apscheduler.executors.base import BaseExecutor, run_job @@ -19,7 +20,13 @@ class BasePoolExecutor(BaseExecutor): else: self._run_job_success(job.id, f.result()) - f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + try: + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + except BrokenProcessPool: + self._logger.warning('Process pool is broken; replacing pool with a fresh instance') + self._pool = self._pool.__class__(self._pool._max_workers) + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + f.add_done_callback(callback) def shutdown(self, wait=True): diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 772b520..8cca199 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -29,6 +29,8 @@ APScheduler, see the :doc:`migration section <migration>`. one search condition * Fixed a problem where bound methods added as jobs via textual references were called with an unwanted extra ``self`` argument (PR by Pengjie Song) +* Fixed ``BrokenPoolError`` in ``ProcessPoolExecutor`` so that it will automatically replace the + broken pool with a fresh instance 3.6.3 diff --git a/tests/test_executors.py b/tests/test_executors.py index 16defd2..dbc8b36 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -2,14 +2,18 @@ from datetime import datetime from threading import Event from types import TracebackType import gc +import os +import signal import time import pytest from pytz import UTC -from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED from apscheduler.executors.base import MaxInstancesReachedError, run_job +from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.job import Job +from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.base import BaseScheduler try: @@ -144,3 +148,31 @@ def test_run_job_memory_leak(): foos = [x for x in gc.get_objects() if type(x) is FooBar] assert len(foos) == 0 + + +def test_broken_pool(): + def listener(evt): + nonlocal pid + pid = evt.retval + event.set() + + pid = None + event = Event() + scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(1)}) + scheduler.add_listener(listener, EVENT_JOB_EXECUTED) + scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC)) + scheduler.start() + + event.wait(3) + killed_pid = pid + os.kill(pid, signal.SIGTERM) + try: + os.waitpid(pid, 0) + except OSError: + pass + + event.clear() + scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC)) + event.wait(3) + assert pid != killed_pid + scheduler.shutdown(True) |