diff options
-rw-r--r-- | apscheduler/executors/debug.py | 15 | ||||
-rw-r--r-- | apscheduler/executors/pool.py | 63 | ||||
-rw-r--r-- | apscheduler/schedulers/base.py | 4 | ||||
-rw-r--r-- | docs/modules/executors/debug.rst | 10 | ||||
-rw-r--r-- | docs/modules/executors/pool.rst | 4 | ||||
-rw-r--r-- | docs/userguide.rst | 36 | ||||
-rw-r--r-- | examples/executors/processpool.py | 4 | ||||
-rw-r--r-- | tests/test_executors.py | 18 | ||||
-rw-r--r-- | tests/test_schedulers.py | 10 |
9 files changed, 96 insertions, 68 deletions
diff --git a/apscheduler/executors/debug.py b/apscheduler/executors/debug.py new file mode 100644 index 0000000..f9e5959 --- /dev/null +++ b/apscheduler/executors/debug.py @@ -0,0 +1,15 @@ +import sys + +from apscheduler.executors.base import BaseExecutor, run_job + + +class DebugExecutor(BaseExecutor): + """A special executor that executes the target callable directly instead of deferring it to a thread or process.""" + + def _do_submit_job(self, job, run_times): + try: + events = run_job(job, job._jobstore_alias, run_times, self._logger.name) + except: + self._run_job_error(job.id, *sys.exc_info()) + else: + self._run_job_success(job.id, events) diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py index f3353a5..38bb7a5 100644 --- a/apscheduler/executors/pool.py +++ b/apscheduler/executors/pool.py @@ -1,52 +1,43 @@ +from abc import abstractmethod import concurrent.futures from apscheduler.executors.base import BaseExecutor, run_job -class DebugExecutor(concurrent.futures.Executor): - """A special executor that executes the target callable directly instead of deferring it to a thread or process.""" +class BasePoolExecutor(BaseExecutor): + @abstractmethod + def __init__(self, pool): + super(BasePoolExecutor, self).__init__() + self._pool = pool - def submit(self, fn, *args, **kwargs): - f = concurrent.futures.Future() - try: - retval = fn(*args, **kwargs) - except Exception as e: - f.set_exception(e) - else: - f.set_result(retval) + def _do_submit_job(self, job, run_times): + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + callback = lambda f: self._run_job_success(job.id, f.result()) + f.add_done_callback(callback) - return f + def shutdown(self, wait=True): + self._pool.shutdown(wait) -class PoolExecutor(BaseExecutor): +class ThreadPoolExecutor(BasePoolExecutor): """ - An executor that runs jobs in a concurrent.futures thread or process pool. + An executor that runs jobs in a concurrent.futures thread pool. - :param str pool_type: - The type of the pool to create: - - * ``thread``: create a thread pool - * ``process``: create a process pool - * ``debug``: run jobs directly in the calling thread - :param max_workers: the size of the thread/process pool. Ignored for pool_type=debug. + :param max_workers: the maximum number of spawned threads. """ - def __init__(self, pool_type, max_workers=10): - super(PoolExecutor, self).__init__() + def __init__(self, max_workers=10): + pool = concurrent.futures.ThreadPoolExecutor(max_workers) + super(ThreadPoolExecutor, self).__init__(pool) - if pool_type == 'thread': - self._pool = concurrent.futures.ThreadPoolExecutor(max_workers) - elif pool_type == 'process': - self._pool = concurrent.futures.ProcessPoolExecutor(max_workers) - elif pool_type == 'debug': - self._pool = DebugExecutor() - else: - raise ValueError('Unknown pool type: %s' % pool_type) - def _do_submit_job(self, job, run_times): - f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) - callback = lambda f: self._run_job_success(job.id, f.result()) - f.add_done_callback(callback) +class ProcessPoolExecutor(BasePoolExecutor): + """ + An executor that runs jobs in a concurrent.futures process pool. - def shutdown(self, wait=True): - self._pool.shutdown(wait) + :param max_workers: the maximum number of spawned processes. + """ + + def __init__(self, max_workers=10): + pool = concurrent.futures.ProcessPoolExecutor(max_workers) + super(ProcessPoolExecutor, self).__init__(pool) diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py index eb2b5b0..bae8e37 100644 --- a/apscheduler/schedulers/base.py +++ b/apscheduler/schedulers/base.py @@ -12,7 +12,7 @@ import six from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor -from apscheduler.executors.pool import PoolExecutor +from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.base import ConflictingIdError, JobLookupError, BaseJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.job import Job @@ -581,7 +581,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def _create_default_executor(self): """Creates a default executor store, specific to the particular scheduler type.""" - return PoolExecutor('thread') + return ThreadPoolExecutor() def _create_default_jobstore(self): """Creates a default job store, specific to the particular scheduler type.""" diff --git a/docs/modules/executors/debug.rst b/docs/modules/executors/debug.rst new file mode 100644 index 0000000..eadc94c --- /dev/null +++ b/docs/modules/executors/debug.rst @@ -0,0 +1,10 @@ +:mod:`apscheduler.executors.debug` +================================== + +.. automodule:: apscheduler.executors.debug + +Module Contents +--------------- + +.. autoclass:: DebugExecutor + :members: diff --git a/docs/modules/executors/pool.rst b/docs/modules/executors/pool.rst index 39bce1b..e1397cf 100644 --- a/docs/modules/executors/pool.rst +++ b/docs/modules/executors/pool.rst @@ -6,6 +6,8 @@ Module Contents --------------- -.. autoclass:: PoolExecutor +.. autoclass:: ThreadPoolExecutor :members: +.. autoclass:: ProcessPoolExecutor + :members: diff --git a/docs/userguide.rst b/docs/userguide.rst index 92764ce..385c4b5 100644 --- a/docs/userguide.rst +++ b/docs/userguide.rst @@ -89,10 +89,10 @@ If, however, you are in the position to choose freely, then the recommended choice due to its strong data integrity protection. Likewise, the choice of executors is usually made for you if you use one of the frameworks above. -Otherwise, the default :class:`~apscheduler.executors.pool.PoolExecutor` should be good enough for most purposes. -If your workload involves CPU intensive operations, you should configure your PoolExecutor to use process pooling -instead of thread pooling to make use of multiple CPU cores. You can add a second PoolExecutor for this purpose, and -only configure one of them for process pooling. +Otherwise, the default :class:`~apscheduler.executors.pool.ThreadPoolExecutor` should be good enough for most purposes. +If your workload involves CPU intensive operations, you should consider using +:class:`~apscheduler.executors.pool.ProcessPoolExecutor` instead to make use of multiple CPU cores. +You could even use both at once, adding the process pool executor as a secondary executor. .. _scheduler-config: @@ -118,8 +118,8 @@ Let's say you want to run BackgroundScheduler in your application with the defau # Initialize the rest of the application here, or before the scheduler initialization -This will get you a BackgroundScheduler with a MemoryJobStore named "default" and a PoolExecutor named "default" with a -default maximum thread count of 10. +This will get you a BackgroundScheduler with a MemoryJobStore named "default" and a ThreadPoolExecutor named "default" +with a default maximum thread count of 10. Now, suppose you want more. You want to have *two* job stores using *two* executors and you also want to tweak the default values for new jobs and set a different timezone. @@ -127,8 +127,8 @@ The following three examples are completely equivalent, and will get you: * a MongoDBJobStore named "mongo" * an SQLAlchemyJobStore named "default" (using SQLite) -* a PoolExecutor using threads, named "default", with a worker count of 20 -* a PoolExecutor using subprocesses, named "processpool", with a worker count of 5 +* a ThreadPoolExecutor named "default", with a worker count of 20 +* a ProcessPoolExecutor named "processpool", with a worker count of 5 * UTC as the scheduler's timezone * coalescing turned off for new jobs by default * a default maximum instance limit of 3 for new jobs @@ -140,7 +140,7 @@ Method 1:: from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore - from apscheduler.executors.pool import PoolExecutor + from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { @@ -148,8 +148,8 @@ Method 1:: 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { - 'default': PoolExecutor('thread', 20), - 'processpool': PoolExecutor('process', 5) + 'default': ThreadPoolExecutor(20), + 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, @@ -172,13 +172,11 @@ Method 2:: 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { - 'class': 'apscheduler.executors.pool:PoolExecutor', - 'type': 'thread', + 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { - 'class': 'apscheduler.executors.pool:PoolExecutor', - 'type': 'process', + 'class': 'apscheduler.executors.pool:ProcessPoolExecutor', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', @@ -193,7 +191,7 @@ Method 3:: from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore - from apscheduler.executors.pool import PoolExecutor + from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { @@ -201,8 +199,8 @@ Method 3:: 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { - 'default': PoolExecutor('thread', max_workers=20), - 'processpool': PoolExecutor('process', max_workers=5) + 'default': ThreadPoolExecutor(max_workers=20), + 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, @@ -253,7 +251,7 @@ requirements on your job: #. Any arguments to the callable must be serializable Of the builtin job stores, only MemoryJobStore doesn't serialize jobs. -Of the builtin executors, only a PoolExecutor configured for process pooling will serialize jobs. +Of the builtin executors, only ProcessPoolExecutor will serialize jobs. .. important:: If you schedule jobs in a persistent job store during your application's initialization, you **MUST** define an explicit ID for the job and use ``replace_existing=True`` or you will get a new copy of the job every time diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py index b8988df..6bff793 100644 --- a/examples/executors/processpool.py +++ b/examples/executors/processpool.py @@ -6,7 +6,7 @@ from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler -from apscheduler.executors.pool import PoolExecutor +from apscheduler.executors.pool import ProcessPoolExecutor def tick(): @@ -15,7 +15,7 @@ def tick(): if __name__ == '__main__': scheduler = BlockingScheduler() - scheduler.add_executor(PoolExecutor('process')) + scheduler.add_executor(ProcessPoolExecutor()) scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) diff --git a/tests/test_executors.py b/tests/test_executors.py index a2048e4..c3c9ae3 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -3,7 +3,7 @@ import time import pytest from apscheduler.executors.base import MaxInstancesReachedError -from apscheduler.executors.pool import PoolExecutor +from apscheduler.executors.pool import BasePoolExecutor try: @@ -19,9 +19,21 @@ def mock_scheduler(): return scheduler_ -@pytest.fixture(params=['thread', 'process']) +@pytest.fixture +def threadpoolexecutor(request): + from apscheduler.executors.pool import ThreadPoolExecutor + return ThreadPoolExecutor() + + +@pytest.fixture +def processpoolexecutor(request): + from apscheduler.executors.pool import ProcessPoolExecutor + return ProcessPoolExecutor() + + +@pytest.fixture(params=[threadpoolexecutor, processpoolexecutor], ids=['threadpool', 'processpool']) def executor(request, mock_scheduler): - executor_ = PoolExecutor(request.param) + executor_ = request.param(request) executor_.start(mock_scheduler, 'dummy') request.addfinalizer(executor_.shutdown) return executor_ diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 7197821..aa8a932 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -7,7 +7,7 @@ import pytest import six from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError -from apscheduler.executors.pool import PoolExecutor +from apscheduler.executors.debug import DebugExecutor from apscheduler.job import Job from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError from apscheduler.jobstores.memory import MemoryJobStore @@ -254,7 +254,7 @@ class TestBaseScheduler(object): @pytest.mark.parametrize('stopped', [True, False], ids=['stopped=True', 'stopped=False']) def test_add_executor(self, scheduler, stopped): scheduler._stopped = stopped - executor = PoolExecutor('debug') + executor = DebugExecutor() executor.start = MagicMock() scheduler.add_executor(executor) @@ -265,13 +265,13 @@ class TestBaseScheduler(object): assert executor.start.call_count == 0 def test_add_executor_already_exists(self, scheduler): - executor = PoolExecutor('debug') + executor = DebugExecutor() scheduler.add_executor(executor) exc = pytest.raises(KeyError, scheduler.add_executor, executor) assert exc.value.message == 'This scheduler already has an executor by the alias of "default"' def test_remove_executor(self, scheduler): - scheduler.add_executor(PoolExecutor('debug'), 'foo') + scheduler.add_executor(DebugExecutor(), 'foo') scheduler._dispatch_event = MagicMock() scheduler.remove_executor('foo') @@ -821,7 +821,7 @@ class TestProcessJobs(object): class SchedulerImplementationTestBase(object): @pytest.fixture(autouse=True) def executor(self, scheduler): - scheduler.add_executor(PoolExecutor('debug')) + scheduler.add_executor(DebugExecutor()) @pytest.fixture def start_scheduler(self, request, scheduler): |