summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-06-16 18:30:10 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-06-16 20:18:13 +0300
commita07a13a2cd00744c65de75b4076d26c4bb37898d (patch)
tree31e75972266ea4b163e2a483e56c8aaf05bb11b8
parentbb20daf50be0a35c449568e3961790ce8b153366 (diff)
downloadapscheduler-a07a13a2cd00744c65de75b4076d26c4bb37898d.tar.gz
Added entry points for executors and job stores too
Added shortcut methods for configuring executors and job stores
-rw-r--r--apscheduler/executors/asyncio.py6
-rw-r--r--apscheduler/executors/debug.py6
-rw-r--r--apscheduler/executors/gevent.py6
-rw-r--r--apscheduler/executors/pool.py4
-rw-r--r--apscheduler/executors/twisted.py6
-rw-r--r--apscheduler/jobstores/memory.py6
-rw-r--r--apscheduler/jobstores/mongodb.py2
-rw-r--r--apscheduler/jobstores/redis.py2
-rw-r--r--apscheduler/jobstores/sqlalchemy.py2
-rw-r--r--apscheduler/schedulers/base.py96
-rw-r--r--docs/userguide.rst15
-rw-r--r--examples/executors/processpool.py3
-rw-r--r--examples/jobstores/mongodb.py6
-rw-r--r--examples/jobstores/redis_.py6
-rw-r--r--examples/jobstores/sqlalchemy_.py3
-rw-r--r--setup.py14
-rw-r--r--tests/test_executors.py9
17 files changed, 140 insertions, 52 deletions
diff --git a/apscheduler/executors/asyncio.py b/apscheduler/executors/asyncio.py
index 198cdb5..2618f61 100644
--- a/apscheduler/executors/asyncio.py
+++ b/apscheduler/executors/asyncio.py
@@ -5,7 +5,11 @@ from apscheduler.executors.base import BaseExecutor, run_job
class AsyncIOExecutor(BaseExecutor):
- """Runs jobs in the default executor of the event loop."""
+ """
+ Runs jobs in the default executor of the event loop.
+
+ Plugin alias: ``asyncio``
+ """
def start(self, scheduler, alias):
super(AsyncIOExecutor, self).start(scheduler, alias)
diff --git a/apscheduler/executors/debug.py b/apscheduler/executors/debug.py
index f9e5959..590d957 100644
--- a/apscheduler/executors/debug.py
+++ b/apscheduler/executors/debug.py
@@ -4,7 +4,11 @@ from apscheduler.executors.base import BaseExecutor, run_job
class DebugExecutor(BaseExecutor):
- """A special executor that executes the target callable directly instead of deferring it to a thread or process."""
+ """
+ A special executor that executes the target callable directly instead of deferring it to a thread or process.
+
+ Plugin alias: ``debug``
+ """
def _do_submit_job(self, job, run_times):
try:
diff --git a/apscheduler/executors/gevent.py b/apscheduler/executors/gevent.py
index bc1069e..6281cdc 100644
--- a/apscheduler/executors/gevent.py
+++ b/apscheduler/executors/gevent.py
@@ -11,7 +11,11 @@ except ImportError: # pragma: nocover
class GeventExecutor(BaseExecutor):
- """Runs jobs as greenlets."""
+ """
+ Runs jobs as greenlets.
+
+ Plugin alias: ``gevent``
+ """
def _do_submit_job(self, job, run_times):
def callback(greenlet):
diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py
index bb3aa56..63fc06e 100644
--- a/apscheduler/executors/pool.py
+++ b/apscheduler/executors/pool.py
@@ -23,6 +23,8 @@ class ThreadPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures thread pool.
+ Plugin alias: ``threadpool``
+
:param max_workers: the maximum number of spawned threads.
"""
@@ -35,6 +37,8 @@ class ProcessPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures process pool.
+ Plugin alias: ``processpool``
+
:param max_workers: the maximum number of spawned processes.
"""
diff --git a/apscheduler/executors/twisted.py b/apscheduler/executors/twisted.py
index 03a212a..a0b1570 100644
--- a/apscheduler/executors/twisted.py
+++ b/apscheduler/executors/twisted.py
@@ -4,7 +4,11 @@ from apscheduler.executors.base import BaseExecutor, run_job
class TwistedExecutor(BaseExecutor):
- """Runs jobs in the reactor's thread pool."""
+ """
+ Runs jobs in the reactor's thread pool.
+
+ Plugin alias: ``twisted``
+ """
def start(self, scheduler, alias):
super(TwistedExecutor, self).start(scheduler, alias)
diff --git a/apscheduler/jobstores/memory.py b/apscheduler/jobstores/memory.py
index 031a105..645391f 100644
--- a/apscheduler/jobstores/memory.py
+++ b/apscheduler/jobstores/memory.py
@@ -5,7 +5,11 @@ from apscheduler.util import datetime_to_utc_timestamp
class MemoryJobStore(BaseJobStore):
- """Stores jobs in an array in RAM. Provides no persistence support."""
+ """
+ Stores jobs in an array in RAM. Provides no persistence support.
+
+ Plugin alias: ``memory``
+ """
def __init__(self):
super(MemoryJobStore, self).__init__()
diff --git a/apscheduler/jobstores/mongodb.py b/apscheduler/jobstores/mongodb.py
index 7f66732..ff762f7 100644
--- a/apscheduler/jobstores/mongodb.py
+++ b/apscheduler/jobstores/mongodb.py
@@ -22,6 +22,8 @@ class MongoDBJobStore(BaseJobStore):
Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to pymongo's `MongoClient
<http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient>`_.
+ Plugin alias: ``mongodb``
+
:param str database: database to store jobs in
:param str collection: collection to store jobs in
:param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of providing connection
diff --git a/apscheduler/jobstores/redis.py b/apscheduler/jobstores/redis.py
index ced4e7c..2b4ffd5 100644
--- a/apscheduler/jobstores/redis.py
+++ b/apscheduler/jobstores/redis.py
@@ -21,6 +21,8 @@ class RedisJobStore(BaseJobStore):
"""
Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's StrictRedis.
+ Plugin alias: ``redis``
+
:param int db: the database number to store jobs in
:param str jobs_key: key to store jobs in
:param str run_times_key: key to store the jobs' run times in
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py
index 9f3c460..f1692a5 100644
--- a/apscheduler/jobstores/sqlalchemy.py
+++ b/apscheduler/jobstores/sqlalchemy.py
@@ -20,6 +20,8 @@ class SQLAlchemyJobStore(BaseJobStore):
"""
Stores jobs in a database table using SQLAlchemy. The table will be created if it doesn't exist in the database.
+ Plugin alias: ``sqlalchemy``
+
:param str url: connection string (see `SQLAlchemy documentation
<http://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=create_engine#database-urls>`_
on this)
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py
index 088de87..96e5fc3 100644
--- a/apscheduler/schedulers/base.py
+++ b/apscheduler/schedulers/base.py
@@ -39,6 +39,10 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
_trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
_trigger_classes = {}
+ _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
+ _executor_classes = {}
+ _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
+ _jobstore_classes = {}
_stopped = True
#
@@ -158,11 +162,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
def running(self):
return not self._stopped
- def add_executor(self, executor, alias='default'):
+ def add_executor(self, executor, alias='default', **executor_opts):
"""
- Adds an executor to this scheduler.
+ Adds an executor to this scheduler. Any extra keyword arguments will be passed to the executor plugin's
+ constructor, assuming that the first argument is the name of an executor plugin.
- :param apscheduler.executors.base.BaseExecutor executor: the executor instance to be added
+ :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor instance or the name of
+ an executor plugin
:param str|unicode alias: alias for the scheduler
"""
@@ -170,7 +176,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if alias in self._executors:
raise KeyError('This scheduler already has an executor by the alias of "%s"' % alias)
- self._executors[alias] = executor
+ if isinstance(executor, BaseExecutor):
+ self._executors[alias] = executor
+ elif isinstance(executor, six.string_types):
+ self._executors[alias] = executor = self._create_plugin_instance('executor', executor, executor_opts)
+ else:
+ raise TypeError('Expected an executor instance or a string, got %s instead' %
+ executor.__class__.__name__)
# Start the executor right away if the scheduler is running
if self.running:
@@ -195,18 +207,26 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_REMOVED, alias))
- def add_jobstore(self, jobstore, alias='default'):
+ def add_jobstore(self, jobstore, alias='default', **jobstore_opts):
"""
- Adds a job store to this scheduler.
+ Adds a job store to this scheduler. Any extra keyword arguments will be passed to the job store plugin's
+ constructor, assuming that the first argument is the name of a job store plugin.
- :param apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
+ :param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
:param str|unicode alias: alias for the job store
"""
with self._jobstores_lock:
if alias in self._jobstores:
raise KeyError('This scheduler already has a job store by the alias of "%s"' % alias)
- self._jobstores[alias] = jobstore
+
+ if isinstance(jobstore, BaseJobStore):
+ self._jobstores[alias] = jobstore
+ elif isinstance(jobstore, six.string_types):
+ self._jobstores[alias] = jobstore = self._create_plugin_instance('jobstore', jobstore, jobstore_opts)
+ else:
+ raise TypeError('Expected a job store instance or a string, got %s instead' %
+ jobstore.__class__.__name__)
# Start the job store right away if the scheduler is running
if self.running:
@@ -563,9 +583,16 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if isinstance(value, BaseExecutor):
self.add_executor(value, alias)
elif isinstance(value, MutableMapping):
- classname = value.pop('class')
- cls = maybe_ref(classname)
- executor = cls(**value)
+ executor_class = value.pop('class', None)
+ plugin = value.pop('type', None)
+ if plugin:
+ executor = self._create_plugin_instance('executor', plugin, value)
+ elif executor_class:
+ cls = maybe_ref(executor_class)
+ executor = cls(**value)
+ else:
+ raise ValueError('Cannot create executor "%s" -- either "type" or "class" must be defined' % alias)
+
self.add_executor(executor, alias)
else:
raise TypeError("Expected executor instance or dict for executors['%s'], got %s instead" % (
@@ -577,9 +604,16 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if isinstance(value, BaseJobStore):
self.add_jobstore(value, alias)
elif isinstance(value, MutableMapping):
- classname = value.pop('class')
- cls = maybe_ref(classname)
- jobstore = cls(**value)
+ jobstore_class = value.pop('class', None)
+ plugin = value.pop('type', None)
+ if plugin:
+ jobstore = self._create_plugin_instance('jobstore', plugin, value)
+ elif jobstore_class:
+ cls = maybe_ref(jobstore_class)
+ jobstore = cls(**value)
+ else:
+ raise ValueError('Cannot create job store "%s" -- either "type" or "class" must be defined' % alias)
+
self.add_jobstore(jobstore, alias)
else:
raise TypeError("Expected job store instance or dict for jobstores['%s'], got %s instead" % (
@@ -696,6 +730,27 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if wakeup:
self.wakeup()
+ def _create_plugin_instance(self, type_, alias, constructor_kwargs):
+ """Creates an instance of the given plugin type, loading the plugin first if necessary."""
+
+ plugin_container, class_container, base_class = {
+ 'trigger': (self._trigger_plugins, self._trigger_classes, BaseTrigger),
+ 'jobstore': (self._jobstore_plugins, self._jobstore_classes, BaseJobStore),
+ 'executor': (self._executor_plugins, self._executor_classes, BaseExecutor)
+ }[type_]
+
+ try:
+ plugin_cls = class_container[alias]
+ except KeyError:
+ if alias in plugin_container:
+ plugin_cls = class_container[alias] = plugin_container[alias].load()
+ if not issubclass(plugin_cls, base_class):
+ raise TypeError('The {0} entry point does not point to a {0} class'.format(type_))
+ else:
+ raise LookupError('No {0} by the name "{1}" was found'.format(type_, alias))
+
+ return plugin_cls(**constructor_kwargs)
+
def _create_trigger(self, trigger, trigger_args):
if isinstance(trigger, BaseTrigger):
return trigger
@@ -707,17 +762,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
# Use the scheduler's time zone if nothing else is specified
trigger_args.setdefault('timezone', self.timezone)
- try:
- trigger_cls = self._trigger_classes[trigger]
- except KeyError:
- if trigger in self._trigger_plugins:
- trigger_cls = self._trigger_classes[trigger] = self._trigger_plugins[trigger].load()
- if not issubclass(trigger_cls, BaseTrigger):
- raise TypeError('The trigger entry point does not point to a trigger class')
- else:
- raise LookupError('No trigger by the name "%s" was found' % trigger)
-
- return trigger_cls(**trigger_args)
+ # Instantiate the trigger class
+ return self._create_plugin_instance('trigger', trigger, trigger_args)
def _create_lock(self):
"""Creates a reentrant lock object."""
diff --git a/docs/userguide.rst b/docs/userguide.rst
index 385c4b5..009babb 100644
--- a/docs/userguide.rst
+++ b/docs/userguide.rst
@@ -94,6 +94,8 @@ If your workload involves CPU intensive operations, you should consider using
:class:`~apscheduler.executors.pool.ProcessPoolExecutor` instead to make use of multiple CPU cores.
You could even use both at once, adding the process pool executor as a secondary executor.
+You can find the plugin names of each job store and executor type in their respective API documentation pages.
+
.. _scheduler-config:
@@ -165,10 +167,10 @@ Method 2::
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
- 'class': 'apscheduler.jobstores.mongodb:MongoDBJobStore'
+ 'type': 'mongodb'
},
'apscheduler.jobstores.default': {
- 'class': 'apscheduler.jobstores.mongodb:SQLAlchemyJobStore',
+ 'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
@@ -176,7 +178,7 @@ Method 2::
'max_workers': '20'
},
'apscheduler.executors.processpool': {
- 'class': 'apscheduler.executors.pool:ProcessPoolExecutor',
+ 'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
@@ -189,17 +191,16 @@ Method 3::
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
+ from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
- 'mongo': MongoDBJobStore(),
+ 'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
- 'default': ThreadPoolExecutor(max_workers=20),
+ 'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py
index 6bff793..df1e82b 100644
--- a/examples/executors/processpool.py
+++ b/examples/executors/processpool.py
@@ -6,7 +6,6 @@ from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.executors.pool import ProcessPoolExecutor
def tick():
@@ -15,7 +14,7 @@ def tick():
if __name__ == '__main__':
scheduler = BlockingScheduler()
- scheduler.add_executor(ProcessPoolExecutor())
+ scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
diff --git a/examples/jobstores/mongodb.py b/examples/jobstores/mongodb.py
index 36a09e0..daf07ae 100644
--- a/examples/jobstores/mongodb.py
+++ b/examples/jobstores/mongodb.py
@@ -10,7 +10,6 @@ import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.jobstores.mongodb import MongoDBJobStore
def alarm(time):
@@ -19,11 +18,10 @@ def alarm(time):
if __name__ == '__main__':
scheduler = BlockingScheduler()
- jobstore = MongoDBJobStore(collection='example_jobs')
+ scheduler.add_jobstore('mongodb', collection='example_jobs')
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
- jobstore.remove_all_jobs()
+ scheduler.remove_all_jobs()
- scheduler.add_jobstore(jobstore)
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
diff --git a/examples/jobstores/redis_.py b/examples/jobstores/redis_.py
index 13f6251..169bcc1 100644
--- a/examples/jobstores/redis_.py
+++ b/examples/jobstores/redis_.py
@@ -10,7 +10,6 @@ import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.jobstores.redis import RedisJobStore
def alarm(time):
@@ -19,11 +18,10 @@ def alarm(time):
if __name__ == '__main__':
scheduler = BlockingScheduler()
- jobstore = RedisJobStore(jobs_key='example.jobs', run_times_key='example.run_times')
+ scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
if len(sys.argv) > 1 and sys.argv[1] == '--clear':
- jobstore.remove_all_jobs()
+ scheduler.remove_all_jobs()
- scheduler.add_jobstore(jobstore)
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
diff --git a/examples/jobstores/sqlalchemy_.py b/examples/jobstores/sqlalchemy_.py
index 8951844..4db4e9c 100644
--- a/examples/jobstores/sqlalchemy_.py
+++ b/examples/jobstores/sqlalchemy_.py
@@ -10,7 +10,6 @@ import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def alarm(time):
@@ -20,7 +19,7 @@ def alarm(time):
if __name__ == '__main__':
scheduler = BlockingScheduler()
url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite'
- scheduler.add_jobstore(SQLAlchemyJobStore(url))
+ scheduler.add_jobstore('sqlalchemy', url=url)
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, delete the example.sqlite file.')
diff --git a/setup.py b/setup.py
index 5910776..2e3888a 100644
--- a/setup.py
+++ b/setup.py
@@ -59,6 +59,20 @@ setup(
'date = apscheduler.triggers.date:DateTrigger',
'interval = apscheduler.triggers.interval:IntervalTrigger',
'cron = apscheduler.triggers.cron:CronTrigger'
+ ],
+ 'apscheduler.executors': [
+ 'debug = apscheduler.executors.debug:DebugExecutor',
+ 'threadpool = apscheduler.executors.pool:ThreadPoolExecutor',
+ 'processpool = apscheduler.executors.pool:ProcessPoolExecutor',
+ 'asyncio = apscheduler.executors.asyncio:AsyncIOExecutor',
+ 'gevent = apscheduler.executors.gevent:GeventExecutor',
+ 'twisted = apscheduler.executors.twisted:TwistedExecutor'
+ ],
+ 'apscheduler.jobstores': [
+ 'memory = apscheduler.jobstores.memory:MemoryJobStore',
+ 'sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore',
+ 'mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore',
+ 'redis = apscheduler.jobstores.redis:RedisJobStore'
]
}
)
diff --git a/tests/test_executors.py b/tests/test_executors.py
index c3c9ae3..f126f3d 100644
--- a/tests/test_executors.py
+++ b/tests/test_executors.py
@@ -3,7 +3,6 @@ import time
import pytest
from apscheduler.executors.base import MaxInstancesReachedError
-from apscheduler.executors.pool import BasePoolExecutor
try:
@@ -22,13 +21,17 @@ def mock_scheduler():
@pytest.fixture
def threadpoolexecutor(request):
from apscheduler.executors.pool import ThreadPoolExecutor
- return ThreadPoolExecutor()
+ executor = ThreadPoolExecutor()
+ request.addfinalizer(executor.shutdown)
+ return executor
@pytest.fixture
def processpoolexecutor(request):
from apscheduler.executors.pool import ProcessPoolExecutor
- return ProcessPoolExecutor()
+ executor = ProcessPoolExecutor()
+ request.addfinalizer(executor.shutdown)
+ return executor
@pytest.fixture(params=[threadpoolexecutor, processpoolexecutor], ids=['threadpool', 'processpool'])