summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-02-22 09:18:58 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-02-24 08:58:11 +0200
commit42941673a50cc235d25daf0da2cf1cca4255004b (patch)
treee24d8ec8f30b888b3288c7fea534455bed858e13
parent6e6649ad14bf3397f0be3a22676968f006969c5f (diff)
downloadapscheduler-42941673a50cc235d25daf0da2cf1cca4255004b.tar.gz
Implemented integration with various event loops
-rw-r--r--CHANGES.rst5
-rw-r--r--README.rst25
-rw-r--r--apscheduler/__init__.py2
-rw-r--r--apscheduler/schedulers/__init__.py12
-rw-r--r--apscheduler/schedulers/asyncio.py53
-rw-r--r--apscheduler/schedulers/background.py28
-rw-r--r--apscheduler/schedulers/base.py (renamed from apscheduler/scheduler.py)413
-rw-r--r--apscheduler/schedulers/blocking.py28
-rw-r--r--apscheduler/schedulers/gevent.py32
-rw-r--r--apscheduler/schedulers/qt.py46
-rw-r--r--apscheduler/schedulers/tornado.py54
-rw-r--r--apscheduler/schedulers/twisted.py53
-rw-r--r--apscheduler/util.py21
-rw-r--r--docs/index.rst16
-rw-r--r--examples/persistent.py7
-rw-r--r--examples/reference.py5
-rw-r--r--examples/schedulers/asyncio_.py25
-rw-r--r--examples/schedulers/background.py26
-rw-r--r--examples/schedulers/blocking.py (renamed from examples/interval.py)7
-rw-r--r--examples/schedulers/gevent_.py24
-rw-r--r--examples/schedulers/qt.py37
-rw-r--r--examples/schedulers/tornado_.py25
-rw-r--r--examples/schedulers/twisted_.py25
-rw-r--r--examples/threaded.py25
-rw-r--r--tests/conftest.py2
-rw-r--r--tests/test_integration.py47
-rw-r--r--tests/test_scheduler.py465
-rw-r--r--tests/test_schedulers.py577
-rw-r--r--tests/test_util.py28
-rw-r--r--tox.ini34
30 files changed, 1352 insertions, 795 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index fbbc417..f5090b8 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -7,6 +7,11 @@ APScheduler, see the :doc:`migration section <migration>`.
3.0.0
-----
+* Split the old Scheduler class into BlockingScheduler and BackgroundScheduler and added integration for
+ asyncio (PEP 3156), Gevent, Tornado, Twisted and Qt event loops
+
+* Added a hook for customizing reading of current time
+
* Added support for timezones
* Removed the shortcuts to built-in triggers in the scheduler API so they no longer get preferential treatment
diff --git a/README.rst b/README.rst
index 98702c9..5a92f60 100644
--- a/README.rst
+++ b/README.rst
@@ -15,17 +15,25 @@ provides features not present in Quartz (such as multiple job stores).
Features
========
-* No (hard) external dependencies, except for setuptools/distribute
* Thread-safe API
-* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.3, Jython 2.5.3, PyPy 1.9)
+* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.4, PyPy 2.2)
* Configurable scheduling mechanisms (triggers):
* Cron-like scheduling
* Delayed scheduling of single run jobs (like the UNIX "at" command)
* Interval-based (run a job at specified time intervals)
+* Integrates with several frameworks:
+
+ * `asyncio <http://docs.python.org/3.4/library/asyncio.html>`_
+ (`PEP 3156 <http://www.python.org/dev/peps/pep-3156/>`_)
+ * `gevent <http://www.gevent.org/>`_
+ * `Tornado <http://www.tornadoweb.org/>`_
+ * `Twisted <http://twistedmatrix.com/>`_
+ * `Qt <http://qt-project.org/>`_ (using either `PyQt <http://www.riverbankcomputing.com/software/pyqt/intro>`_
+ or `PySide <http://qt-project.org/wiki/PySide>`_)
* Multiple, simultaneously active job stores:
- * RAM
+ * Memory
* File-based simple database (shelve)
* `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any supported RDBMS works)
* `MongoDB <http://www.mongodb.org/>`_
@@ -41,14 +49,13 @@ Documentation can be found `here <http://readthedocs.org/docs/apscheduler/en/lat
Source
======
-The source can be browsed at `Bitbucket
-<http://bitbucket.org/agronholm/apscheduler/src/>`_.
+The source can be browsed at `Bitbucket <http://bitbucket.org/agronholm/apscheduler/src/>`_.
Reporting bugs
==============
-A `bug tracker <http://bitbucket.org/agronholm/apscheduler/issues/>`_
+A `bug tracker <https://bitbucket.org/agronholm/apscheduler/issues?status=new&status=open>`_
is provided by bitbucket.org.
@@ -57,7 +64,5 @@ Getting help
If you have problems or other questions, you can either:
-* Ask on the `APScheduler Google group
- <http://groups.google.com/group/apscheduler>`_, or
-* Ask on the ``#apscheduler`` channel on
- `Freenode IRC <http://freenode.net/irc_servers.shtml>`_
+* Ask on the `APScheduler Google group <http://groups.google.com/group/apscheduler>`_, or
+* Ask on the ``#apscheduler`` channel on `Freenode IRC <http://freenode.net/irc_servers.shtml>`_
diff --git a/apscheduler/__init__.py b/apscheduler/__init__.py
index d5b383d..4d7ecb0 100644
--- a/apscheduler/__init__.py
+++ b/apscheduler/__init__.py
@@ -1,3 +1,3 @@
-version_info = (3, 0, 0, 'dev1')
+version_info = (3, 0, 0, 'pre1')
version = '.'.join(str(n) for n in version_info[:3])
release = '.'.join(str(n) for n in version_info)
diff --git a/apscheduler/schedulers/__init__.py b/apscheduler/schedulers/__init__.py
new file mode 100644
index 0000000..bd8a790
--- /dev/null
+++ b/apscheduler/schedulers/__init__.py
@@ -0,0 +1,12 @@
+class SchedulerAlreadyRunningError(Exception):
+ """Raised when attempting to start or configure the scheduler when it's already running."""
+
+ def __str__(self):
+ return 'Scheduler is already running'
+
+
+class SchedulerNotRunningError(Exception):
+ """Raised when attempting to shutdown the scheduler when it's not running."""
+
+ def __str__(self):
+ return 'Scheduler is not running'
diff --git a/apscheduler/schedulers/asyncio.py b/apscheduler/schedulers/asyncio.py
new file mode 100644
index 0000000..8bd35ee
--- /dev/null
+++ b/apscheduler/schedulers/asyncio.py
@@ -0,0 +1,53 @@
+from __future__ import absolute_import
+from functools import wraps
+
+from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.util import maybe_ref
+
+try:
+ import asyncio
+except ImportError: # pragma: nocover
+ raise ImportError('AsyncIOScheduler requires either Python 3.4 or the asyncio package installed')
+
+
+def run_in_event_loop(func):
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs)
+ return wrapper
+
+
+class AsyncIOScheduler(BaseScheduler):
+ """A scheduler that runs on an asyncio (PEP 3156) event loop."""
+
+ _eventloop = None
+ _timeout = None
+
+ def start(self):
+ super(AsyncIOScheduler, self).start()
+ self._wakeup()
+
+ @run_in_event_loop
+ def shutdown(self, wait=True):
+ super(AsyncIOScheduler, self).shutdown(wait)
+ self._stop_timer()
+
+ def _configure(self, config):
+ self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop()
+ super(AsyncIOScheduler, self)._configure(config)
+
+ def _start_timer(self, wait_seconds):
+ self._stop_timer()
+ if wait_seconds is not None:
+ self._timeout = self._eventloop.call_later(wait_seconds, self._wakeup)
+
+ def _stop_timer(self):
+ if self._timeout:
+ self._timeout.cancel()
+ del self._timeout
+
+ @run_in_event_loop
+ def _wakeup(self):
+ self._stop_timer()
+ wait_seconds = self._process_jobs()
+ self._start_timer(wait_seconds)
diff --git a/apscheduler/schedulers/background.py b/apscheduler/schedulers/background.py
new file mode 100644
index 0000000..c6fdff3
--- /dev/null
+++ b/apscheduler/schedulers/background.py
@@ -0,0 +1,28 @@
+from __future__ import absolute_import
+from threading import Thread, Event
+
+from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.util import asbool
+
+
+class BackgroundScheduler(BlockingScheduler):
+ """A scheduler that runs in the background using a separate thread."""
+
+ _thread = None
+
+ def _configure(self, config):
+ self._daemon = asbool(config.pop('daemon', True))
+ super(BackgroundScheduler, self)._configure(config)
+
+ def start(self):
+ BaseScheduler.start(self)
+ self._event = Event()
+ self._thread = Thread(target=self._main_loop, name='APScheduler')
+ self._thread.daemon = self._daemon
+ self._thread.start()
+
+ def shutdown(self, wait=True):
+ super(BackgroundScheduler, self).shutdown(wait)
+ self._thread.join()
+ del self._thread
diff --git a/apscheduler/scheduler.py b/apscheduler/schedulers/base.py
index 1be0162..c4a6af0 100644
--- a/apscheduler/scheduler.py
+++ b/apscheduler/schedulers/base.py
@@ -1,7 +1,8 @@
"""
This module is the main part of the library. It houses the Scheduler class and related exceptions.
"""
-from threading import Thread, Event, Lock
+from abc import ABCMeta, abstractmethod
+from threading import Lock
from datetime import datetime, timedelta
from logging import getLogger
from collections import Mapping, Iterable
@@ -10,9 +11,11 @@ import os
import sys
from pkg_resources import iter_entry_points
-from dateutil.tz import gettz, tzlocal
-from six import string_types, u, itervalues, iteritems
+from dateutil.tz import tzlocal
+from six import u, itervalues, iteritems
+import six
+from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
from apscheduler.util import *
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job, MaxInstancesReachedError
@@ -24,33 +27,23 @@ try:
except ImportError:
from inspect import getargspec
-logger = getLogger(__name__)
+class BaseScheduler(six.with_metaclass(ABCMeta)):
+ """Base class for all schedulers."""
-class SchedulerAlreadyRunningError(Exception):
- """
- Raised when attempting to start or configure the scheduler when it's already running.
- """
-
- def __str__(self):
- return 'Scheduler is already running'
-
-
-class Scheduler(object):
- """
- This class is responsible for scheduling jobs and triggering their execution.
- """
-
- _stopped = False
- _thread = None
+ _stopped = True
_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
+ #
+ # Public API
+ #
+
def __init__(self, gconfig={}, **options):
- self._wakeup = Event()
+ super(BaseScheduler, self).__init__()
self._jobstores = {}
- self._jobstores_lock = Lock()
+ self._jobstores_lock = self._create_lock()
self._listeners = []
- self._listeners_lock = Lock()
+ self._listeners_lock = self._create_lock()
self._pending_jobs = []
self._triggers = {}
self.configure(gconfig, **options)
@@ -62,47 +55,13 @@ class Scheduler(object):
if self.running:
raise SchedulerAlreadyRunningError
- # Set general options
config = combine_opts(gconfig, 'apscheduler.', options)
- self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
- self.coalesce = asbool(config.pop('coalesce', True))
- self.daemonic = asbool(config.pop('daemonic', True))
- self.standalone = asbool(config.pop('standalone', False))
- timezone = config.pop('timezone', None)
- self.timezone = gettz(timezone) if isinstance(timezone, string_types) else timezone or tzlocal()
-
- # Set trigger defaults
- self.trigger_defaults = {'timezone': self.timezone}
-
- # Configure the thread pool
- if 'threadpool' in config:
- self._threadpool = maybe_ref(config['threadpool'])
- else:
- threadpool_opts = combine_opts(config, 'threadpool.')
- self._threadpool = ThreadPool(**threadpool_opts)
-
- # Configure job stores
- jobstore_opts = combine_opts(config, 'jobstore.')
- jobstores = {}
- for key, value in jobstore_opts.items():
- store_name, option = key.split('.', 1)
- opts_dict = jobstores.setdefault(store_name, {})
- opts_dict[option] = value
-
- for alias, opts in jobstores.items():
- classname = opts.pop('class')
- cls = maybe_ref(classname)
- jobstore = cls(**opts)
- self.add_jobstore(jobstore, alias, True)
+ self._configure(config)
+ @abstractmethod
def start(self):
- """
- Starts the scheduler in a new thread.
+ """Starts the scheduler. The details of this process depend on the implementation."""
- In threaded mode (the default), this method will return immediately after starting the scheduler thread.
-
- In standalone mode, this method will block until there are no more scheduled jobs.
- """
if self.running:
raise SchedulerAlreadyRunningError
@@ -116,44 +75,36 @@ class Scheduler(object):
del self._pending_jobs[:]
self._stopped = False
- if self.standalone:
- self._main_loop()
- else:
- self._thread = Thread(target=self._main_loop, name='APScheduler')
- self._thread.setDaemon(self.daemonic)
- self._thread.start()
+ self.logger.info('Scheduler started')
+
+ # Notify listeners that the scheduler has been started
+ self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
- def shutdown(self, wait=True, shutdown_threadpool=True, close_jobstores=True):
+ @abstractmethod
+ def shutdown(self, wait=True):
"""
- Shuts down the scheduler and terminates the thread. Does not interrupt any currently running jobs.
+ Shuts down the scheduler. Does not interrupt any currently running jobs.
- :param wait: ``True`` to wait until all currently executing jobs have finished (if ``shutdown_threadpool`` is
- also ``True``)
- :param shutdown_threadpool: ``True`` to shut down the thread pool
- :param close_jobstores: ``True`` to close all job stores after shutdown
+ :param wait: ``True`` to wait until all currently executing jobs have finished
"""
if not self.running:
- return
+ raise SchedulerNotRunningError
self._stopped = True
- self._wakeup.set()
# Shut down the thread pool
- if shutdown_threadpool:
- self._threadpool.shutdown(wait)
-
- # Wait until the scheduler thread terminates
- if self._thread:
- self._thread.join()
+ self._threadpool.shutdown(wait)
# Close all job stores
- if close_jobstores:
- for jobstore in itervalues(self._jobstores):
- jobstore.close()
+ for jobstore in itervalues(self._jobstores):
+ jobstore.close()
+
+ self.logger.info('Scheduler has been shut down')
+ self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
@property
def running(self):
- return not self._stopped and self._thread and self._thread.isAlive()
+ return not self._stopped
def add_jobstore(self, jobstore, alias, quiet=False):
"""
@@ -175,8 +126,8 @@ class Scheduler(object):
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias))
# Notify the scheduler so it can scan the new job store for jobs
- if not quiet:
- self._wakeup.set()
+ if not quiet and self.running:
+ self._wakeup()
def remove_jobstore(self, alias, close=True):
"""
@@ -218,80 +169,6 @@ class Scheduler(object):
if callback == cb:
del self._listeners[i]
- def _notify_listeners(self, event):
- with self._listeners_lock:
- listeners = tuple(self._listeners)
-
- for cb, mask in listeners:
- if event.code & mask:
- try:
- cb(event)
- except:
- logger.exception('Error notifying listener')
-
- def _real_add_job(self, job, jobstore, wakeup):
- # Recalculate the next run time
- job.compute_next_run_time(datetime.now(self.timezone))
-
- # Add the job to the given job store
- store = self._jobstores.get(jobstore)
- if not store:
- raise KeyError('No such job store: %s' % jobstore)
- store.add_job(job)
-
- # Notify listeners that a new job has been added
- event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
- self._notify_listeners(event)
-
- logger.info('Added job "%s" to job store "%s"', job, jobstore)
-
- # Notify the scheduler about the new job
- if wakeup:
- self._wakeup.set()
-
- @staticmethod
- def _check_callable_args(func, args, kwargs):
- if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'):
- func = func.__call__
- argspec = getargspec(func)
- argspec_args = argspec.args[1:] if ismethod(func) else argspec.args
- varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None)
- kwargs_set = frozenset(kwargs)
- mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args)
- mandatory_args_matches = frozenset(argspec_args[:len(args)])
- mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args)
- kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', []))
- kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ())
-
- # Make sure there are no conflicts between args and kwargs
- pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches)
- if pos_kwargs_conflicts:
- raise ValueError('The following arguments are supplied in both args and kwargs: %s' %
- ', '.join(pos_kwargs_conflicts))
-
- # Check that the number of positional arguments minus the number of matched kwargs matches the argspec
- missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches)
- if missing_args:
- raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args))
-
- # Check that the callable can accept the given number of positional arguments
- if not argspec.varargs and len(args) > len(argspec_args):
- raise ValueError('The list of positional arguments is longer than the target callable can handle '
- '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args)))
-
- # Check that the callable can accept the given keyword arguments
- if not varkw:
- unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args)
- if unmatched_kwargs:
- raise ValueError('The target callable does not accept the following keyword arguments: %s' %
- ', '.join(unmatched_kwargs))
-
- # Check that all keyword-only arguments have been supplied
- unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults
- if unmatched_kwargs:
- raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' %
- ', '.join(unmatched_kwargs))
-
def add_job(self, func, trigger, trigger_args=(), args=None, kwargs=None, misfire_grace_time=None, coalesce=None,
name=None, max_runs=None, max_instances=1, jobstore='default'):
"""
@@ -367,13 +244,13 @@ class Scheduler(object):
self._check_callable_args(job.func, args, kwargs)
# Ensure that dead-on-arrival jobs are never added
- if job.compute_next_run_time(datetime.now(self.timezone)) is None:
+ if job.compute_next_run_time(self._current_time()) is None:
raise ValueError('Not adding job since it would never be run')
# Don't really add jobs to job stores before the scheduler is up and running
if not self.running:
self._pending_jobs.append((job, jobstore))
- logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
+ self.logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, True)
@@ -388,15 +265,6 @@ class Scheduler(object):
return func
return inner
- def _remove_job(self, job, alias, jobstore):
- jobstore.remove_job(job)
-
- # Notify listeners that a job has been removed
- event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
- self._notify_listeners(event)
-
- logger.info('Removed job "%s"', job)
-
def get_jobs(self):
"""
Returns a list of all scheduled jobs.
@@ -456,31 +324,161 @@ class Scheduler(object):
out.write(os.linesep.join(job_strs) + os.linesep)
+ #
+ # Protected API
+ #
+
+ def _configure(self, config):
+ # Set general options
+ self.logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler')
+ self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
+ self.coalesce = asbool(config.pop('coalesce', True))
+ self.timezone = astimezone(config.pop('timezone', None)) or tzlocal()
+
+ # Set trigger defaults
+ self.trigger_defaults = {'timezone': self.timezone}
+
+ # Configure the thread pool
+ if 'threadpool' in config:
+ self._threadpool = maybe_ref(config['threadpool'])
+ else:
+ threadpool_opts = combine_opts(config, 'threadpool.')
+ self._threadpool = ThreadPool(**threadpool_opts)
+
+ # Configure job stores
+ jobstore_opts = combine_opts(config, 'jobstore.')
+ jobstores = {}
+ for key, value in jobstore_opts.items():
+ store_name, option = key.split('.', 1)
+ opts_dict = jobstores.setdefault(store_name, {})
+ opts_dict[option] = value
+
+ for alias, opts in jobstores.items():
+ classname = opts.pop('class')
+ cls = maybe_ref(classname)
+ jobstore = cls(**opts)
+ self.add_jobstore(jobstore, alias, True)
+
+ def _notify_listeners(self, event):
+ with self._listeners_lock:
+ listeners = tuple(self._listeners)
+
+ for cb, mask in listeners:
+ if event.code & mask:
+ try:
+ cb(event)
+ except:
+ self.logger.exception('Error notifying listener')
+
+ def _real_add_job(self, job, jobstore, wakeup):
+ # Recalculate the next run time
+ job.compute_next_run_time(self._current_time())
+
+ # Add the job to the given job store
+ store = self._jobstores.get(jobstore)
+ if not store:
+ raise KeyError('No such job store: %s' % jobstore)
+ store.add_job(job)
+
+ # Notify listeners that a new job has been added
+ event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
+ self._notify_listeners(event)
+
+ self.logger.info('Added job "%s" to job store "%s"', job, jobstore)
+
+ # Notify the scheduler about the new job
+ if wakeup:
+ self._wakeup()
+
+ def _remove_job(self, job, alias, jobstore):
+ jobstore.remove_job(job)
+
+ # Notify listeners that a job has been removed
+ event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
+ self._notify_listeners(event)
+
+ self.logger.info('Removed job "%s"', job)
+
+ @staticmethod
+ def _check_callable_args(func, args, kwargs):
+ """Ensures that the given callable can be called with the given arguments."""
+
+ if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'):
+ func = func.__call__
+ argspec = getargspec(func)
+ argspec_args = argspec.args[1:] if ismethod(func) else argspec.args
+ varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None)
+ kwargs_set = frozenset(kwargs)
+ mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args)
+ mandatory_args_matches = frozenset(argspec_args[:len(args)])
+ mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args)
+ kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', []))
+ kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ())
+
+ # Make sure there are no conflicts between args and kwargs
+ pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches)
+ if pos_kwargs_conflicts:
+ raise ValueError('The following arguments are supplied in both args and kwargs: %s' %
+ ', '.join(pos_kwargs_conflicts))
+
+ # Check that the number of positional arguments minus the number of matched kwargs matches the argspec
+ missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches)
+ if missing_args:
+ raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args))
+
+ # Check that the callable can accept the given number of positional arguments
+ if not argspec.varargs and len(args) > len(argspec_args):
+ raise ValueError('The list of positional arguments is longer than the target callable can handle '
+ '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args)))
+
+ # Check that the callable can accept the given keyword arguments
+ if not varkw:
+ unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args)
+ if unmatched_kwargs:
+ raise ValueError('The target callable does not accept the following keyword arguments: %s' %
+ ', '.join(unmatched_kwargs))
+
+ # Check that all keyword-only arguments have been supplied
+ unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults
+ if unmatched_kwargs:
+ raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' %
+ ', '.join(unmatched_kwargs))
+
+ @abstractmethod
+ def _wakeup(self):
+ """Triggers :meth:`_process_jobs` to be run in an implementation specific manner."""
+
+ def _create_lock(self):
+ return Lock()
+
+ def _current_time(self):
+ return datetime.now(self.timezone)
+
def _run_job(self, job, run_times):
- """
- Acts as a harness that runs the actual job code in a thread.
- """
+ """Acts as a harness that runs the actual job code in the thread pool."""
+
for run_time in run_times:
# See if the job missed its run time window, and handle possible
# misfires accordingly
- difference = datetime.now(self.timezone) - run_time
+ difference = self._current_time() - run_time
grace_time = timedelta(seconds=job.misfire_grace_time)
if difference > grace_time:
# Notify listeners about a missed run
event = JobEvent(EVENT_JOB_MISSED, job, run_time)
self._notify_listeners(event)
- logger.warning('Run time of job "%s" was missed by %s', job, difference)
+ self.logger.warning('Run time of job "%s" was missed by %s', job, difference)
else:
try:
job.add_instance()
except MaxInstancesReachedError:
event = JobEvent(EVENT_JOB_MISSED, job, run_time)
self._notify_listeners(event)
- logger.warning('Execution of job "%s" skipped: maximum number of running instances reached (%d)',
- job, job.max_instances)
+ self.logger.warning(
+ 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', job,
+ job.max_instances)
break
- logger.info('Running job "%s" (scheduled at %s)', job, run_time)
+ self.logger.info('Running job "%s" (scheduled at %s)', job, run_time)
try:
retval = job.func(*job.args, **job.kwargs)
@@ -490,13 +488,13 @@ class Scheduler(object):
event = JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb)
self._notify_listeners(event)
- logger.exception('Job "%s" raised an exception', job)
+ self.logger.exception('Job "%s" raised an exception', job)
else:
# Notify listeners about successful execution
event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval)
self._notify_listeners(event)
- logger.info('Job "%s" executed successfully', job)
+ self.logger.info('Job "%s" executed successfully', job)
job.remove_instance()
@@ -504,11 +502,15 @@ class Scheduler(object):
if job.coalesce:
break
- def _process_jobs(self, now):
- """
- Iterates through jobs in every jobstore, starts pending jobs and figures out the next wakeup time.
+ def _process_jobs(self):
+ """Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for
+ the next round.
"""
+
+ self.logger.debug('Looking for jobs to run')
+ now = self._current_time()
next_wakeup_time = None
+
with self._jobstores_lock:
for alias, jobstore in iteritems(self._jobstores):
for job in tuple(jobstore.jobs):
@@ -517,10 +519,7 @@ class Scheduler(object):
self._threadpool.submit(self._run_job, job, run_times)
# Increase the job's run count
- if job.coalesce:
- job.runs += 1
- else:
- job.runs += len(run_times)
+ job.runs += 1 if job.coalesce else len(run_times)
# Update the job, but don't keep finished jobs around
if job.compute_next_run_time(now + timedelta(microseconds=1)):
@@ -532,35 +531,13 @@ class Scheduler(object):
next_wakeup_time = job.next_run_time
elif job.next_run_time:
next_wakeup_time = min(next_wakeup_time, job.next_run_time)
- return next_wakeup_time
-
- def _main_loop(self):
- """Executes jobs on schedule."""
-
- logger.info('Scheduler started')
- self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
- self._wakeup.clear()
- while not self._stopped:
- logger.debug('Looking for jobs to run')
- now = datetime.now(self.timezone)
- next_wakeup_time = self._process_jobs(now)
-
- # Sleep until the next job is scheduled to be run,
- # a new job is added or the scheduler is stopped
- if next_wakeup_time is not None:
- wait_seconds = time_difference(next_wakeup_time, now)
- logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds)
- self._wakeup.wait(wait_seconds)
- self._wakeup.clear()
- elif self.standalone:
- logger.debug('No jobs left; shutting down scheduler')
- self.shutdown()
- break
- else:
- logger.debug('No jobs; waiting until a job is added')
- self._wakeup.wait()
- self._wakeup.clear()
+ # Determine the delay until this method should be called again
+ if next_wakeup_time is not None:
+ wait_seconds = time_difference(next_wakeup_time, now)
+ self.logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds)
+ else:
+ wait_seconds = None
+ self.logger.debug('No jobs; waiting until a job is added')
- logger.info('Scheduler has been shut down')
- self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
+ return wait_seconds
diff --git a/apscheduler/schedulers/blocking.py b/apscheduler/schedulers/blocking.py
new file mode 100644
index 0000000..a910ccf
--- /dev/null
+++ b/apscheduler/schedulers/blocking.py
@@ -0,0 +1,28 @@
+from __future__ import absolute_import
+from threading import Event
+
+from apscheduler.schedulers.base import BaseScheduler
+
+
+class BlockingScheduler(BaseScheduler):
+ """A scheduler that runs in the foreground. Calling :meth:`start` will block."""
+
+ _event = None
+
+ def start(self):
+ super(BlockingScheduler, self).start()
+ self._event = Event()
+ self._main_loop()
+
+ def shutdown(self, wait=True):
+ super(BlockingScheduler, self).shutdown(wait)
+ self._event.set()
+
+ def _main_loop(self):
+ while self.running:
+ wait_seconds = self._process_jobs()
+ self._event.wait(wait_seconds)
+ self._event.clear()
+
+ def _wakeup(self):
+ self._event.set()
diff --git a/apscheduler/schedulers/gevent.py b/apscheduler/schedulers/gevent.py
new file mode 100644
index 0000000..5962708
--- /dev/null
+++ b/apscheduler/schedulers/gevent.py
@@ -0,0 +1,32 @@
+from __future__ import absolute_import
+
+from gevent.lock import RLock
+
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.schedulers.base import BaseScheduler
+
+try:
+ from gevent.event import Event
+ import gevent
+except ImportError: # pragma: nocover
+ raise ImportError('GeventScheduler requires gevent installed')
+
+
+class GeventScheduler(BlockingScheduler):
+ """A scheduler that runs as a Gevent greenlet."""
+
+ _greenlet = None
+
+ def start(self):
+ BaseScheduler.start(self)
+ self._event = Event()
+ self._greenlet = gevent.spawn(self._main_loop)
+ return self._greenlet
+
+ def shutdown(self, wait=True):
+ super(GeventScheduler, self).shutdown(wait)
+ self._greenlet.join()
+ del self._greenlet
+
+ def _create_lock(self):
+ return RLock()
diff --git a/apscheduler/schedulers/qt.py b/apscheduler/schedulers/qt.py
new file mode 100644
index 0000000..61174ab
--- /dev/null
+++ b/apscheduler/schedulers/qt.py
@@ -0,0 +1,46 @@
+from __future__ import absolute_import
+
+from apscheduler.schedulers.base import BaseScheduler
+
+try:
+ from PyQt5.QtCore import QObject, QTimer
+except ImportError: # pragma: nocover
+ try:
+ from PyQt4.QtCore import QObject, QTimer
+ except ImportError:
+ try:
+ from PySide.QtCore import QObject, QTimer
+ except ImportError:
+ raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed')
+
+
+class QtScheduler(BaseScheduler):
+ """A scheduler that runs in a Qt event loop."""
+
+ _timer = None
+
+ def start(self):
+ super(QtScheduler, self).start()
+ self._wakeup()
+
+ def shutdown(self, wait=True):
+ super(QtScheduler, self).shutdown(wait)
+ self._stop_timer()
+
+ def _start_timer(self, wait_seconds):
+ self._stop_timer()
+ if wait_seconds is not None:
+ self._timer = QTimer.singleShot(wait_seconds * 1000, self._process_jobs)
+
+ def _stop_timer(self):
+ if self._timer:
+ if self._timer.isActive():
+ self._timer.stop()
+ del self._timer
+
+ def _wakeup(self):
+ self._start_timer(0)
+
+ def _process_jobs(self):
+ wait_seconds = super(QtScheduler, self)._process_jobs()
+ self._start_timer(wait_seconds)
diff --git a/apscheduler/schedulers/tornado.py b/apscheduler/schedulers/tornado.py
new file mode 100644
index 0000000..f241371
--- /dev/null
+++ b/apscheduler/schedulers/tornado.py
@@ -0,0 +1,54 @@
+from __future__ import absolute_import
+from datetime import timedelta
+from functools import wraps
+
+from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.util import maybe_ref
+
+try:
+ from tornado.ioloop import IOLoop
+except ImportError: # pragma: nocover
+ raise ImportError('TornadoScheduler requires tornado installed')
+
+
+def run_in_ioloop(func):
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ self._ioloop.add_callback(func, self, *args, **kwargs)
+ return wrapper
+
+
+class TornadoScheduler(BaseScheduler):
+ """A scheduler that runs on a Tornado IOLoop."""
+
+ _ioloop = None
+ _timeout = None
+
+ def start(self):
+ super(TornadoScheduler, self).start()
+ self._wakeup()
+
+ @run_in_ioloop
+ def shutdown(self, wait=True):
+ super(TornadoScheduler, self).shutdown(wait)
+ self._stop_timer()
+
+ def _configure(self, config):
+ self._ioloop = maybe_ref(config.pop('io_loop', None)) or IOLoop.current()
+ super(TornadoScheduler, self)._configure(config)
+
+ def _start_timer(self, wait_seconds):
+ self._stop_timer()
+ if wait_seconds is not None:
+ self._timeout = self._ioloop.add_timeout(timedelta(seconds=wait_seconds), self._wakeup)
+
+ def _stop_timer(self):
+ if self._timeout:
+ self._ioloop.remove_timeout(self._timeout)
+ del self._timeout
+
+ @run_in_ioloop
+ def _wakeup(self):
+ self._stop_timer()
+ wait_seconds = self._process_jobs()
+ self._start_timer(wait_seconds)
diff --git a/apscheduler/schedulers/twisted.py b/apscheduler/schedulers/twisted.py
new file mode 100644
index 0000000..2c1e8fb
--- /dev/null
+++ b/apscheduler/schedulers/twisted.py
@@ -0,0 +1,53 @@
+from __future__ import absolute_import
+from functools import wraps
+
+from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.util import maybe_ref
+
+try:
+ from twisted.internet import reactor as default_reactor
+except ImportError: # pragma: nocover
+ raise ImportError('TwistedScheduler requires Twisted installed')
+
+
+def run_in_reactor(func):
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ self._reactor.callFromThread(func, self, *args, **kwargs)
+ return wrapper
+
+
+class TwistedScheduler(BaseScheduler):
+ """A scheduler that runs on a Twisted reactor."""
+
+ _reactor = None
+ _delayedcall = None
+
+ def _configure(self, config):
+ self._reactor = maybe_ref(config.pop('reactor', default_reactor))
+ super(TwistedScheduler, self)._configure(config)
+
+ def start(self):
+ super(TwistedScheduler, self).start()
+ self._wakeup()
+
+ @run_in_reactor
+ def shutdown(self, wait=True):
+ super(TwistedScheduler, self).shutdown(wait)
+ self._stop_timer()
+
+ def _start_timer(self, wait_seconds):
+ self._stop_timer()
+ if wait_seconds is not None:
+ self._delayedcall = self._reactor.callLater(wait_seconds, self._wakeup)
+
+ def _stop_timer(self):
+ if self._delayedcall and self._delayedcall.active():
+ self._delayedcall.cancel()
+ del self._delayedcall
+
+ @run_in_reactor
+ def _wakeup(self):
+ self._stop_timer()
+ wait_seconds = self._process_jobs()
+ self._start_timer(wait_seconds)
diff --git a/apscheduler/util.py b/apscheduler/util.py
index b9b5c2f..163288e 100644
--- a/apscheduler/util.py
+++ b/apscheduler/util.py
@@ -2,15 +2,15 @@
This module contains several handy functions primarily meant for internal use.
"""
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, tzinfo
from time import mktime
import re
from dateutil.tz import gettz
from six import string_types
-__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds', 'time_difference', 'datetime_ceil',
- 'combine_opts', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref')
+__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'timedelta_seconds', 'time_difference',
+ 'datetime_ceil', 'combine_opts', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref')
def asint(text):
@@ -40,6 +40,21 @@ def asbool(obj):
return bool(obj)
+def astimezone(obj):
+ """
+ Interprets an object as a timezone.
+
+ :rtype: :class:`~datetime.tzinfo`
+ """
+
+ if isinstance(obj, string_types):
+ return gettz(obj)
+ if isinstance(obj, tzinfo):
+ return obj
+ if obj is not None:
+ raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__)
+
+
_DATE_REGEX = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
diff --git a/docs/index.rst b/docs/index.rst
index e8c09b3..b2a0c23 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -23,18 +23,26 @@ provides features not present in Quartz (such as multiple job stores).
Features
--------
-* No (hard) external dependencies, except for setuptools/distribute
* Thread-safe API
-* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.3, Jython 2.5.3, PyPy 1.9)
+* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.4, PyPy 2.2)
* Configurable scheduling mechanisms (triggers):
* Cron-like scheduling
* Delayed scheduling of single run jobs (like the UNIX "at" command)
* Interval-based (run a job at specified time intervals)
+* Integrates with several frameworks:
+
+ * `asyncio <http://docs.python.org/3.4/library/asyncio.html>`_
+ (`PEP 3156 <http://www.python.org/dev/peps/pep-3156/>`_)
+ * `gevent <http://www.gevent.org/>`_
+ * `Tornado <http://www.tornadoweb.org/>`_
+ * `Twisted <http://twistedmatrix.com/>`_
+ * `Qt <http://qt-project.org/>`_ (using either `PyQt <http://www.riverbankcomputing.com/software/pyqt/intro>`_
+ or `PySide <http://qt-project.org/wiki/PySide>`_)
* Multiple, simultaneously active job stores:
- * RAM
- * File-based simple database (:py:mod:`shelve`)
+ * Memory
+ * File-based simple database (shelve)
* `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any supported RDBMS works)
* `MongoDB <http://www.mongodb.org/>`_
* `Redis <http://redis.io/>`_
diff --git a/examples/persistent.py b/examples/persistent.py
index b75b778..1e1a155 100644
--- a/examples/persistent.py
+++ b/examples/persistent.py
@@ -6,8 +6,8 @@ You can exit the program, restart it and observe that any previous alarms that h
from datetime import datetime, timedelta
-from apscheduler.scheduler import Scheduler
-from apscheduler.jobstores.shelve_store import ShelveJobStore
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.jobstores.shelve import ShelveJobStore
def alarm(time):
@@ -15,12 +15,13 @@ def alarm(time):
if __name__ == '__main__':
- scheduler = Scheduler(standalone=True)
+ scheduler = BlockingScheduler()
scheduler.add_jobstore(ShelveJobStore('example.db'), 'shelve')
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'simple', [alarm_time], jobstore='shelve', args=[datetime.now()])
print('To clear the alarms, delete the example.db file.')
print('Press Ctrl+C to exit')
+
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
diff --git a/examples/reference.py b/examples/reference.py
index dc9756c..2b0c414 100644
--- a/examples/reference.py
+++ b/examples/reference.py
@@ -2,13 +2,14 @@
Basic example showing how to schedule a callable using a textual reference.
"""
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.blocking import BlockingScheduler
if __name__ == '__main__':
- scheduler = Scheduler(standalone=True)
+ scheduler = BlockingScheduler()
scheduler.add_job('sys:stdout.write', 'interval', {'seconds': 3}, args=['tick\n'])
print('Press Ctrl+C to exit')
+
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
diff --git a/examples/schedulers/asyncio_.py b/examples/schedulers/asyncio_.py
new file mode 100644
index 0000000..01cc138
--- /dev/null
+++ b/examples/schedulers/asyncio_.py
@@ -0,0 +1,25 @@
+"""
+Demonstrates how to use the Tornado compatible scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+import asyncio
+
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = AsyncIOScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.start()
+ print('Press Ctrl+C to exit')
+
+ # Execution will block here until Ctrl+C is pressed.
+ try:
+ asyncio.get_event_loop().run_forever()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/examples/schedulers/background.py b/examples/schedulers/background.py
new file mode 100644
index 0000000..8a3dde5
--- /dev/null
+++ b/examples/schedulers/background.py
@@ -0,0 +1,26 @@
+"""
+Demonstrates how to use the background scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+import time
+
+from apscheduler.schedulers.background import BackgroundScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = BackgroundScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.start()
+ print('Press Ctrl+C to exit')
+
+ try:
+ # This is here to simulate application activity (which keeps the main thread alive).
+ while True:
+ time.sleep(2)
+ except (KeyboardInterrupt, SystemExit):
+ scheduler.shutdown() # Not strictly necessary if daemonic mode is enabled but should be done if possible
diff --git a/examples/interval.py b/examples/schedulers/blocking.py
index 5469fef..55d1385 100644
--- a/examples/interval.py
+++ b/examples/schedulers/blocking.py
@@ -1,10 +1,10 @@
"""
-Basic example showing how to start the scheduler and schedule a job that executes on 3 second intervals.
+Demonstrates how to use the blocking scheduler to schedule a job that executes on 3 second intervals.
"""
from datetime import datetime
-from apscheduler.scheduler import Scheduler
+from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
@@ -12,9 +12,10 @@ def tick():
if __name__ == '__main__':
- scheduler = Scheduler(standalone=True)
+ scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', {'seconds': 3})
print('Press Ctrl+C to exit')
+
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
diff --git a/examples/schedulers/gevent_.py b/examples/schedulers/gevent_.py
new file mode 100644
index 0000000..597aa4d
--- /dev/null
+++ b/examples/schedulers/gevent_.py
@@ -0,0 +1,24 @@
+"""
+Demonstrates how to use the gevent compatible scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+
+from apscheduler.schedulers.gevent import GeventScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = GeventScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ g = scheduler.start() # g is the greenlet that runs the scheduler loop
+ print('Press Ctrl+C to exit')
+
+ # Execution will block here until Ctrl+C is pressed.
+ try:
+ g.join()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/examples/schedulers/qt.py b/examples/schedulers/qt.py
new file mode 100644
index 0000000..15b9192
--- /dev/null
+++ b/examples/schedulers/qt.py
@@ -0,0 +1,37 @@
+"""
+Demonstrates how to use the Qt compatible scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+import signal
+import sys
+
+from apscheduler.schedulers.qt import QtScheduler
+
+try:
+ from PyQt5.QtWidgets import QApplication, QLabel
+except ImportError:
+ try:
+ from PyQt4.QtGui import QApplication, QLabel
+ except ImportError:
+ from PySide.QtGui import QApplication, QLabel
+
+
+def tick():
+ label.setText('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ app = QApplication(sys.argv)
+ signal.signal(signal.SIGINT, lambda *args: QApplication.quit()) # This enables processing of Ctrl+C keypresses
+ label = QLabel('The timer text will appear here in a moment!')
+ label.setWindowTitle('QtScheduler example')
+ label.setFixedSize(280, 50)
+ label.show()
+
+ scheduler = QtScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.start()
+
+ # Execution will block here until the user closes the windows or Ctrl+C is pressed.
+ app.exec_()
diff --git a/examples/schedulers/tornado_.py b/examples/schedulers/tornado_.py
new file mode 100644
index 0000000..42ddab5
--- /dev/null
+++ b/examples/schedulers/tornado_.py
@@ -0,0 +1,25 @@
+"""
+Demonstrates how to use the Tornado compatible scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+
+from tornado.ioloop import IOLoop
+from apscheduler.schedulers.tornado import TornadoScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = TornadoScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.start()
+ print('Press Ctrl+C to exit')
+
+ # Execution will block here until Ctrl+C is pressed.
+ try:
+ IOLoop.instance().start()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/examples/schedulers/twisted_.py b/examples/schedulers/twisted_.py
new file mode 100644
index 0000000..a1f7358
--- /dev/null
+++ b/examples/schedulers/twisted_.py
@@ -0,0 +1,25 @@
+"""
+Demonstrates how to use the Twisted compatible scheduler to schedule a job that executes on 3 second intervals.
+"""
+
+from datetime import datetime
+
+from twisted.internet import reactor
+from apscheduler.schedulers.twisted import TwistedScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = TwistedScheduler()
+ scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.start()
+ print('Press Ctrl+C to exit')
+
+ # Execution will block here until Ctrl+C is pressed.
+ try:
+ reactor.run()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/examples/threaded.py b/examples/threaded.py
deleted file mode 100644
index 7a11937..0000000
--- a/examples/threaded.py
+++ /dev/null
@@ -1,25 +0,0 @@
-"""
-Basic example showing how the scheduler integrates with the application it's running alongside with.
-"""
-
-from datetime import datetime
-import time
-
-from apscheduler.scheduler import Scheduler
-
-
-def tick():
- print('Tick! The time is: %s' % datetime.now())
-
-
-if __name__ == '__main__':
- scheduler = Scheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
- print('Press Ctrl+C to exit')
- scheduler.start()
-
- # This is here to simulate application activity (which keeps the main
- # thread alive).
- while True:
- print('This is the main thread.')
- time.sleep(2)
diff --git a/tests/conftest.py b/tests/conftest.py
index f2dc475..7c4f70b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -10,6 +10,8 @@ from apscheduler.jobstores.shelve import ShelveJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.redis import RedisJobStore
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.schedulers.blocking import BlockingScheduler
@pytest.fixture
diff --git a/tests/test_integration.py b/tests/test_integration.py
deleted file mode 100644
index 986f0c6..0000000
--- a/tests/test_integration.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from time import sleep
-
-import pytest
-
-from apscheduler.scheduler import Scheduler
-from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
-from tests.conftest import all_jobstores, all_jobstores_ids
-
-
-def increment(vals, sleeptime):
- vals[0] += 1
- sleep(sleeptime)
-
-
-@pytest.fixture(params=all_jobstores, ids=all_jobstores_ids)
-def sched(request):
- jobstore = request.param(request)
- sched = Scheduler()
- sched.add_jobstore(jobstore, 'persistent')
- sched.start()
- request.addfinalizer(sched.shutdown)
- return sched
-
-
-def test_overlapping_runs(sched):
- # Makes sure that "increment" is only ran once, since it will still be
- # running when the next appointed time hits.
-
- vals = [0]
- sched.add_job(increment, 'interval', {'seconds': 1}, args=[vals, 2], jobstore='persistent')
- sleep(2.5)
- assert vals == [1]
-
-
-def test_max_instances(sched):
- vals = [0]
- events = []
- sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- sched.add_job(increment, 'interval', {'seconds': 0.3}, max_instances=2, max_runs=4, args=[vals, 1],
- jobstore='persistent')
- sleep(2.4)
- assert vals == [2]
- assert len(events) == 4
- assert events[0].code == EVENT_JOB_MISSED
- assert events[1].code == EVENT_JOB_MISSED
- assert events[2].code == EVENT_JOB_EXECUTED
- assert events[3].code == EVENT_JOB_EXECUTED
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
deleted file mode 100644
index 759c73e..0000000
--- a/tests/test_scheduler.py
+++ /dev/null
@@ -1,465 +0,0 @@
-from datetime import datetime, timedelta
-from logging import StreamHandler, ERROR
-from io import StringIO
-from copy import copy
-import os
-
-from dateutil.tz import tzoffset
-import pytest
-
-from apscheduler.jobstores.memory import MemoryJobStore
-from apscheduler.scheduler import Scheduler, SchedulerAlreadyRunningError
-from apscheduler.job import Job
-from apscheduler.events import (EVENT_JOB_EXECUTED, SchedulerEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
- EVENT_JOB_MISSED)
-from apscheduler import scheduler
-from tests.conftest import minpython
-
-
-local_tz = tzoffset('DUMMYTZ', 3600)
-
-
-class FakeThread(object):
- def isAlive(self):
- return True
-
-
-class FakeThreadPool(object):
- def submit(self, func, *args, **kwargs):
- func(*args, **kwargs)
-
-
-class DummyException(Exception):
- pass
-
-
-class FakeDateTime(datetime):
- original_now = datetime(2011, 4, 3, 18, 40, tzinfo=local_tz)
- _now = original_now
-
- @classmethod
- def now(cls, timezone=None):
- return cls._now
-
-
-class TestOfflineScheduler(object):
- @pytest.fixture()
- def sched(self, request):
- def finish():
- if sched.running:
- sched.shutdown()
-
- sched = Scheduler()
- request.addfinalizer(finish)
- return sched
-
- def test_jobstore_twice(self, sched):
- with pytest.raises(KeyError):
- sched.add_jobstore(MemoryJobStore(), 'dummy')
- sched.add_jobstore(MemoryJobStore(), 'dummy')
-
- def test_add_tentative_job(self, sched):
- job = sched.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy')
- assert isinstance(job, Job)
- assert sched.get_jobs() == []
-
- def test_add_job_by_reference(self, sched):
- job = sched.add_job('copy:copy', 'date', [datetime(2200, 7, 24)], args=[()])
- assert job.func == copy
- assert job.func_ref == 'copy:copy'
-
- def test_configure_jobstore(self, sched):
- conf = {'apscheduler.jobstore.memstore.class': 'apscheduler.jobstores.memory:MemoryJobStore'}
- sched.configure(conf)
- sched.remove_jobstore('memstore')
-
- def test_shutdown_offline(self, sched):
- sched.shutdown()
-
- def test_configure_no_prefix(self, sched):
- global_options = {'misfire_grace_time': '2', 'daemonic': 'false'}
- sched.configure(global_options)
- assert sched.misfire_grace_time == 1
- assert sched.daemonic is True
-
- def test_configure_prefix(self, sched):
- global_options = {'apscheduler.misfire_grace_time': 2, 'apscheduler.daemonic': False}
- sched.configure(global_options)
- assert sched.misfire_grace_time == 2
- assert sched.daemonic is False
-
- def test_add_listener(self, sched):
- val = []
- sched.add_listener(val.append)
-
- event = SchedulerEvent(EVENT_SCHEDULER_START)
- sched._notify_listeners(event)
- assert len(val) == 1
- assert val[0] == event
-
- event = SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)
- sched._notify_listeners(event)
- assert len(val) == 2
- assert val[1] == event
-
- sched.remove_listener(val.append)
- sched._notify_listeners(event)
- assert len(val) == 2
-
- def test_pending_jobs(self, sched):
- # Tests that pending jobs are properly added to the jobs list when
- # the scheduler is started (and not before!)
- sched.add_job(lambda: None, 'date', [datetime(9999, 9, 9)])
- assert sched.get_jobs() == []
-
- sched.start()
- jobs = sched.get_jobs()
- assert len(jobs) == 1
-
- def test_invalid_callable_args(self, sched):
- """Tests that attempting to schedule a job with an invalid number of arguments raises an exception."""
-
- with pytest.raises(ValueError) as error:
- sched.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], args=[1, 2])
-
- assert str(error.value) == ('The list of positional arguments is longer than the target callable can handle '
- '(allowed: 1, given in args: 2)')
-
- def test_invalid_callable_kwargs(self, sched):
- """Tests that attempting to schedule a job with unmatched keyword arguments raises an exception."""
-
- with pytest.raises(ValueError) as error:
- sched.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], kwargs={'x': 0, 'y': 1})
-
- assert str(error.value) == 'The target callable does not accept the following keyword arguments: y'
-
- def test_missing_callable_args(self, sched):
- """Tests that attempting to schedule a job with missing arguments raises an exception."""
-
- with pytest.raises(ValueError) as error:
- sched.add_job(lambda x, y, z: None, 'date', [datetime(9999, 9, 9)], args=[1], kwargs={'y': 0})
-
- assert str(error.value) == 'The following arguments are not supplied: z'
-
- def test_conflicting_callable_args(self, sched):
- """Tests that attempting to schedule a job where the combination of args and kwargs are in conflict raises an
- exception."""
-
- with pytest.raises(ValueError) as error:
- sched.add_job(lambda x, y: None, 'date', [datetime(9999, 9, 9)], args=[1, 2], kwargs={'y': 1})
-
- assert str(error.value) == 'The following arguments are supplied in both args and kwargs: y'
-
- @minpython(3)
- def test_unfulfilled_kwargs(self, sched):
- """Tests that attempting to schedule a job where not all keyword-only arguments are fulfilled raises an
- exception."""
-
- func = eval("lambda x, *, y, z=1: None")
- with pytest.raises(ValueError) as error:
- sched.add_job(func, 'date', [datetime(9999, 9, 9)], args=[1])
-
- assert str(error.value) == 'The following keyword-only arguments have not been supplied in kwargs: y'
-
-
-class TestJobExecution(object):
- @pytest.fixture
- def sched(self):
- sched = Scheduler(threadpool=FakeThreadPool(), timezone=local_tz)
- sched.add_jobstore(MemoryJobStore(), 'default')
-
- # Make the scheduler think it's running
- sched._thread = FakeThread()
- return sched
-
- @pytest.fixture
- def logstream(self, request):
- stream = StringIO()
- loghandler = StreamHandler(stream)
- loghandler.setLevel(ERROR)
- scheduler.logger.addHandler(loghandler)
- request.addfinalizer(lambda: scheduler.logger.removeHandler(loghandler))
- return stream
-
- @pytest.fixture
- def fake_datetime(self, request, monkeypatch):
- monkeypatch.setattr(scheduler, 'datetime', FakeDateTime)
- request.addfinalizer(lambda: setattr(FakeDateTime, '_now', FakeDateTime.original_now))
-
- def test_job_name(self, sched):
- def my_job():
- pass
-
- job = sched.add_job(my_job, 'interval', {'start_date': datetime(2010, 5, 19)})
- assert (repr(job) ==
- "<Job (name=my_job, trigger=<IntervalTrigger (interval=datetime.timedelta(0, 1), "
- "start_date='2010-05-19 00:00:00 DUMMYTZ')>)>")
-
- def test_schedule_object(self, sched):
- # Tests that any callable object is accepted (and not just functions)
- class A:
- def __init__(self):
- self.val = 0
-
- def __call__(self):
- self.val += 1
-
- a = A()
- job = sched.add_job(a, 'interval', {'seconds': 1})
- sched._process_jobs(job.next_run_time)
- sched._process_jobs(job.next_run_time)
- assert a.val == 2
-
- def test_schedule_method(self, sched):
- # Tests that bound methods can be scheduled (at least with RAMJobStore)
- class A:
- def __init__(self):
- self.val = 0
-
- def method(self):
- self.val += 1
-
- a = A()
- job = sched.add_job(a.method, 'interval', {'seconds': 1})
- sched._process_jobs(job.next_run_time)
- sched._process_jobs(job.next_run_time)
- assert a.val == 2
-
- def test_unschedule_job(self, sched):
- def increment():
- vals[0] += 1
-
- vals = [0]
- job = sched.add_job(increment, 'cron')
- sched._process_jobs(job.next_run_time)
- assert vals[0] == 1
- sched.unschedule_job(job)
- sched._process_jobs(job.next_run_time)
- assert vals[0] == 1
-
- def test_unschedule_func(self, sched):
- def increment():
- vals[0] += 1
-
- def increment2():
- vals[0] += 1
-
- vals = [0]
- job1 = sched.add_job(increment, 'cron')
- job2 = sched.add_job(increment2, 'cron')
- job3 = sched.add_job(increment, 'cron')
- assert sched.get_jobs() == [job1, job2, job3]
-
- sched.unschedule_func(increment)
- assert sched.get_jobs() == [job2]
-
- def test_unschedule_func_notfound(self, sched):
- pytest.raises(KeyError, sched.unschedule_func, copy)
-
- def test_job_finished(self, sched):
- def increment():
- vals[0] += 1
-
- vals = [0]
- job = sched.add_job(increment, 'interval', max_runs=1)
- sched._process_jobs(job.next_run_time)
- assert vals == [1]
- assert job not in sched.get_jobs()
-
- def test_job_exception(self, sched, logstream):
- def failure():
- raise DummyException
-
- job = sched.add_job(failure, 'date', [datetime(9999, 9, 9)])
- sched._process_jobs(job.next_run_time)
- assert 'DummyException' in logstream.getvalue()
-
- def test_misfire_grace_time(self, sched):
- sched.misfire_grace_time = 3
- job = sched.add_job(lambda: None, 'interval', {'seconds': 1})
- assert job.misfire_grace_time == 3
-
- job = sched.add_job(lambda: None, 'interval', {'seconds': 1}, misfire_grace_time=2)
- assert job.misfire_grace_time == 2
-
- def test_coalesce_on(self, sched, fake_datetime):
- # Makes sure that the job is only executed once when it is scheduled
- # to be executed twice in a row
- def increment():
- vals[0] += 1
-
- vals = [0]
- events = []
- sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()},
- coalesce=True, misfire_grace_time=2)
-
- # Turn the clock 14 seconds forward
- FakeDateTime._now += timedelta(seconds=2)
-
- sched._process_jobs(FakeDateTime.now())
- assert job.runs == 1
- assert len(events) == 1
- assert events[0].code == EVENT_JOB_EXECUTED
- assert vals == [1]
-
- def test_coalesce_off(self, sched, fake_datetime):
- # Makes sure that every scheduled run for the job is executed even
- # when they are in the past (but still within misfire_grace_time)
- def increment():
- vals[0] += 1
-
- vals = [0]
- events = []
- sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()},
- coalesce=False, misfire_grace_time=2)
-
- # Turn the clock 2 seconds forward
- FakeDateTime._now += timedelta(seconds=2)
-
- sched._process_jobs(FakeDateTime.now())
- assert job.runs == 3
- assert len(events) == 3
- assert events[0].code == EVENT_JOB_EXECUTED
- assert events[1].code == EVENT_JOB_EXECUTED
- assert events[2].code == EVENT_JOB_EXECUTED
- assert vals == [3]
-
- def test_interval(self, sched):
- def increment(amount):
- vals[0] += amount
- vals[1] += 1
-
- vals = [0, 0]
- job = sched.add_job(increment, 'interval', {'seconds': 1}, args=[2])
- sched._process_jobs(job.next_run_time)
- sched._process_jobs(job.next_run_time)
- assert vals == [4, 2]
-
- def test_interval_schedule(self, sched):
- @sched.scheduled_job('interval', {'seconds': 1})
- def increment():
- vals[0] += 1
-
- vals = [0]
- start = increment.job.next_run_time
- sched._process_jobs(start)
- sched._process_jobs(start + timedelta(seconds=1))
- assert vals == [2]
-
- def test_cron(self, sched):
- def increment(amount):
- vals[0] += amount
- vals[1] += 1
-
- vals = [0, 0]
- job = sched.add_job(increment, 'cron', args=[3])
- start = job.next_run_time
- sched._process_jobs(start)
- assert vals == [3, 1]
- sched._process_jobs(start + timedelta(seconds=1))
- assert vals == [6, 2]
- sched._process_jobs(start + timedelta(seconds=2))
- assert vals == [9, 3]
-
- def test_cron_schedule_1(self, sched):
- @sched.scheduled_job('cron')
- def increment():
- vals[0] += 1
-
- vals = [0]
- start = increment.job.next_run_time
- sched._process_jobs(start)
- sched._process_jobs(start + timedelta(seconds=1))
- assert vals[0] == 2
-
- def test_cron_schedule_2(self, sched):
- @sched.scheduled_job('cron', {'minute': '*'})
- def increment():
- vals[0] += 1
-
- vals = [0]
- start = increment.job.next_run_time
- next_run = start + timedelta(seconds=60)
- assert increment.job.get_run_times(next_run) == [start, next_run]
- sched._process_jobs(start)
- sched._process_jobs(next_run)
- assert vals[0] == 2
-
- def test_date(self, sched):
- def append_val(value):
- vals.append(value)
-
- vals = []
- date = datetime.now(local_tz) + timedelta(seconds=1)
- sched.add_job(append_val, 'date', [date], kwargs={'value': 'test'})
- sched._process_jobs(date)
- assert vals == ['test']
-
- def test_print_jobs(self, sched):
- out = StringIO()
- sched.print_jobs(out)
- expected = 'Jobstore default:%s'\
- ' No scheduled jobs%s' % (os.linesep, os.linesep)
- assert out.getvalue() == expected
-
- sched.add_job(copy, 'date', [datetime(2200, 5, 19)], args=[()])
- out = StringIO()
- sched.print_jobs(out)
- expected = 'Jobstore default:%s '\
- 'copy (trigger: date[2200-05-19 00:00:00 DUMMYTZ], '\
- 'next run at: 2200-05-19 00:00:00 DUMMYTZ)%s' % (os.linesep, os.linesep)
- assert out.getvalue() == expected
-
- def test_jobstore(self, sched):
- sched.add_jobstore(MemoryJobStore(), 'dummy')
- job = sched.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy')
- assert sched.get_jobs() == [job]
- sched.remove_jobstore('dummy')
- assert sched.get_jobs() == []
-
- def test_remove_nonexistent_jobstore(self, sched):
- pytest.raises(KeyError, sched.remove_jobstore, 'dummy2')
-
- def test_job_next_run_time(self, sched):
- # Tests against bug #5
- def increment():
- vars[0] += 1
-
- vars = [0]
- scheduler.datetime = FakeDateTime
- job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()},
- misfire_grace_time=3)
- start = job.next_run_time
-
- sched._process_jobs(start)
- assert vars == [1]
-
- sched._process_jobs(start)
- assert vars == [1]
-
- sched._process_jobs(start + timedelta(seconds=1))
- assert vars == [2]
-
-
-class TestRunningScheduler(object):
- @pytest.fixture
- def sched(self, request):
- sched = Scheduler()
- sched.start()
- request.addfinalizer(sched.shutdown)
- return sched
-
- def test_shutdown_timeout(self, sched):
- sched.shutdown()
-
- def test_scheduler_double_start(self, sched):
- pytest.raises(SchedulerAlreadyRunningError, sched.start)
-
- def test_scheduler_configure_running(self, sched):
- pytest.raises(SchedulerAlreadyRunningError, sched.configure, {})
-
- def test_scheduler_double_shutdown(self, sched):
- sched.shutdown()
- sched.shutdown(False)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
new file mode 100644
index 0000000..0e9ef5f
--- /dev/null
+++ b/tests/test_schedulers.py
@@ -0,0 +1,577 @@
+from datetime import datetime, timedelta
+from logging import StreamHandler, ERROR
+from io import StringIO
+from copy import copy
+from threading import Event, Thread
+from time import sleep
+import os
+
+from dateutil.tz import tzoffset
+import pytest
+
+from apscheduler.jobstores.memory import MemoryJobStore
+from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.job import Job
+from apscheduler.events import (SchedulerEvent, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
+ EVENT_JOB_MISSED, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_JOB_ADDED,
+ EVENT_JOBSTORE_JOB_REMOVED)
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.threadpool import ThreadPool
+from tests.conftest import minpython
+
+dummy_tz = tzoffset('DUMMYTZ', 3600)
+dummy_datetime = datetime(2011, 4, 3, 18, 40, tzinfo=dummy_tz)
+
+
+class DummyThreadPool(object):
+ def submit(self, func, *args, **kwargs):
+ func(*args, **kwargs)
+
+ def shutdown(self, wait):
+ pass
+
+
+class DummyException(Exception):
+ pass
+
+
+class DummyScheduler(BaseScheduler):
+ def __init__(self, gconfig={}, **options):
+ super(DummyScheduler, self).__init__(gconfig, timezone=dummy_tz, threadpool=DummyThreadPool(), **options)
+ self.now = dummy_datetime
+
+ def start(self):
+ super(DummyScheduler, self).start()
+
+ def shutdown(self, wait=True):
+ super(DummyScheduler, self).shutdown()
+
+ def _wakeup(self):
+ pass
+
+ def _current_time(self):
+ return self.now
+
+
+def increment(vals):
+ vals[0] += 1
+
+
+@pytest.fixture
+def scheduler(request):
+ sched = DummyScheduler()
+ if 'start_scheduler' in request.keywords:
+ sched.start()
+ request.addfinalizer(lambda: sched.shutdown() if sched.running else None)
+
+ return sched
+
+
+@pytest.fixture
+def logstream(request, scheduler):
+ stream = StringIO()
+ loghandler = StreamHandler(stream)
+ loghandler.setLevel(ERROR)
+ scheduler.logger.addHandler(loghandler)
+ request.addfinalizer(lambda: scheduler.logger.removeHandler(loghandler))
+ return stream
+
+
+class TestOfflineScheduler(object):
+ def test_jobstore_twice(self, scheduler):
+ with pytest.raises(KeyError):
+ scheduler.add_jobstore(MemoryJobStore(), 'dummy')
+ scheduler.add_jobstore(MemoryJobStore(), 'dummy')
+
+ def test_add_tentative_job(self, scheduler):
+ job = scheduler.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy')
+ assert isinstance(job, Job)
+ assert scheduler.get_jobs() == []
+
+ def test_add_job_by_reference(self, scheduler):
+ job = scheduler.add_job('copy:copy', 'date', [datetime(2200, 7, 24)], args=[()])
+ assert job.func == copy
+ assert job.func_ref == 'copy:copy'
+
+ def test_configure_jobstore(self, scheduler):
+ conf = {'apscheduler.jobstore.memstore.class': 'apscheduler.jobstores.memory:MemoryJobStore'}
+ scheduler.configure(conf)
+ scheduler.remove_jobstore('memstore')
+
+ def test_shutdown_offline(self, scheduler):
+ pytest.raises(SchedulerNotRunningError, scheduler.shutdown)
+
+ def test_configure_no_prefix(self, scheduler):
+ global_options = {'misfire_grace_time': '2', 'coalesce': 'false'}
+ scheduler.configure(global_options)
+ assert scheduler.misfire_grace_time == 1
+ assert scheduler.coalesce is True
+
+ def test_configure_prefix(self, scheduler):
+ global_options = {'apscheduler.misfire_grace_time': 2, 'apscheduler.coalesce': False}
+ scheduler.configure(global_options)
+ assert scheduler.misfire_grace_time == 2
+ assert scheduler.coalesce is False
+
+ def test_add_listener(self, scheduler):
+ val = []
+ scheduler.add_listener(val.append)
+
+ event = SchedulerEvent(EVENT_SCHEDULER_START)
+ scheduler._notify_listeners(event)
+ assert len(val) == 1
+ assert val[0] == event
+
+ event = SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)
+ scheduler._notify_listeners(event)
+ assert len(val) == 2
+ assert val[1] == event
+
+ scheduler.remove_listener(val.append)
+ scheduler._notify_listeners(event)
+ assert len(val) == 2
+
+ def test_pending_jobs(self, scheduler):
+ """Tests that pending jobs are properly added to the jobs list, but only when the scheduler is started."""
+
+ scheduler.add_job(lambda: None, 'date', [datetime(9999, 9, 9)])
+ assert scheduler.get_jobs() == []
+
+ scheduler.start()
+ jobs = scheduler.get_jobs()
+ assert len(jobs) == 1
+
+ def test_invalid_callable_args(self, scheduler):
+ """Tests that attempting to schedule a job with an invalid number of arguments raises an exception."""
+
+ with pytest.raises(ValueError) as error:
+ scheduler.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], args=[1, 2])
+
+ assert str(error.value) == ('The list of positional arguments is longer than the target callable can handle '
+ '(allowed: 1, given in args: 2)')
+
+ def test_invalid_callable_kwargs(self, scheduler):
+ """Tests that attempting to schedule a job with unmatched keyword arguments raises an exception."""
+
+ with pytest.raises(ValueError) as error:
+ scheduler.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], kwargs={'x': 0, 'y': 1})
+
+ assert str(error.value) == 'The target callable does not accept the following keyword arguments: y'
+
+ def test_missing_callable_args(self, scheduler):
+ """Tests that attempting to schedule a job with missing arguments raises an exception."""
+
+ with pytest.raises(ValueError) as error:
+ scheduler.add_job(lambda x, y, z: None, 'date', [datetime(9999, 9, 9)], args=[1], kwargs={'y': 0})
+
+ assert str(error.value) == 'The following arguments are not supplied: z'
+
+ def test_conflicting_callable_args(self, scheduler):
+ """Tests that attempting to schedule a job where the combination of args and kwargs are in conflict raises an
+ exception."""
+
+ with pytest.raises(ValueError) as error:
+ scheduler.add_job(lambda x, y: None, 'date', [datetime(9999, 9, 9)], args=[1, 2], kwargs={'y': 1})
+
+ assert str(error.value) == 'The following arguments are supplied in both args and kwargs: y'
+
+ @minpython(3)
+ def test_unfulfilled_kwargs(self, scheduler):
+ """Tests that attempting to schedule a job where not all keyword-only arguments are fulfilled raises an
+ exception."""
+
+ func = eval("lambda x, *, y, z=1: None")
+ with pytest.raises(ValueError) as error:
+ scheduler.add_job(func, 'date', [datetime(9999, 9, 9)], args=[1])
+
+ assert str(error.value) == 'The following keyword-only arguments have not been supplied in kwargs: y'
+
+
+@pytest.mark.start_scheduler
+class TestJobExecution(object):
+ def test_job_name(self, scheduler):
+ def my_job():
+ pass
+
+ job = scheduler.add_job(my_job, 'interval', {'start_date': datetime(2010, 5, 19)})
+ assert (repr(job) ==
+ "<Job (name=my_job, trigger=<IntervalTrigger (interval=datetime.timedelta(0, 1), "
+ "start_date='2010-05-19 00:00:00 DUMMYTZ')>)>")
+
+ def test_schedule_object(self, scheduler):
+ """Tests that any callable object is accepted (and not just functions)."""
+
+ class A(object):
+ def __init__(self):
+ self.val = 0
+
+ def __call__(self):
+ self.val += 1
+
+ a = A()
+ job = scheduler.add_job(a, 'interval', {'seconds': 1})
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert a.val == 1
+
+ def test_schedule_method(self, scheduler):
+ """Tests that bound methods can be scheduled (at least with MemoryJobStore)."""
+
+ class A:
+ def __init__(self):
+ self.val = 0
+
+ def method(self):
+ self.val += 1
+
+ a = A()
+ job = scheduler.add_job(a.method, 'interval', {'seconds': 1})
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert a.val == 1
+
+ def test_unschedule_job(self, scheduler):
+ vals = [0]
+ job = scheduler.add_job(increment, 'cron', args=(vals,))
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert vals[0] == 1
+
+ scheduler.unschedule_job(job)
+ scheduler._process_jobs()
+ assert vals[0] == 1
+
+ def test_unschedule_func(self, scheduler):
+ vals = [0]
+ job1 = scheduler.add_job(increment, 'cron', args=(vals,))
+ job2 = scheduler.add_job(lambda: None, 'cron')
+ job3 = scheduler.add_job(increment, 'cron', args=(vals,))
+ assert scheduler.get_jobs() == [job1, job2, job3]
+
+ scheduler.unschedule_func(increment)
+ assert scheduler.get_jobs() == [job2]
+
+ def test_unschedule_func_notfound(self, scheduler):
+ pytest.raises(KeyError, scheduler.unschedule_func, copy)
+
+ def test_job_finished(self, scheduler):
+ vals = [0]
+ job = scheduler.add_job(increment, 'interval', args=(vals,), max_runs=1)
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert vals == [1]
+ assert job not in scheduler.get_jobs()
+
+ def test_job_exception(self, scheduler, logstream):
+ def failure():
+ raise DummyException
+
+ job = scheduler.add_job(failure, 'date', [datetime(9999, 9, 9)])
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert 'DummyException' in logstream.getvalue()
+
+ def test_misfire_grace_time(self, scheduler):
+ scheduler.misfire_grace_time = 3
+ job = scheduler.add_job(lambda: None, 'interval', {'seconds': 1})
+ assert job.misfire_grace_time == 3
+
+ job = scheduler.add_job(lambda: None, 'interval', {'seconds': 1}, misfire_grace_time=2)
+ assert job.misfire_grace_time == 2
+
+ def test_coalesce_on(self, scheduler):
+ """Tests that the job is only executed once when it is scheduled to be executed twice in a row."""
+
+ vals = [0]
+ events = []
+ scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
+ job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime},
+ args=(vals,), coalesce=True, misfire_grace_time=2)
+
+ # Turn the clock 2 seconds forward
+ scheduler.now += timedelta(seconds=2)
+
+ scheduler._process_jobs()
+ assert job.runs == 1
+ assert len(events) == 1
+ assert events[0].code == EVENT_JOB_EXECUTED
+ assert vals == [1]
+
+ def test_coalesce_off(self, scheduler):
+ """Tests that every scheduled run for the job is executed even when they are in the past
+ (but still within misfire_grace_time).
+ """
+
+ vals = [0]
+ events = []
+ scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
+ job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime},
+ args=(vals,), coalesce=False, misfire_grace_time=2)
+
+ # Turn the clock 2 seconds forward
+ scheduler.now += timedelta(seconds=2)
+
+ scheduler._process_jobs()
+ assert job.runs == 3
+ assert len(events) == 3
+ assert events[0].code == EVENT_JOB_EXECUTED
+ assert events[1].code == EVENT_JOB_EXECUTED
+ assert events[2].code == EVENT_JOB_EXECUTED
+ assert vals == [3]
+
+ def test_print_jobs(self, scheduler):
+ out = StringIO()
+ scheduler.print_jobs(out)
+ expected = 'Jobstore default:%s'\
+ ' No scheduled jobs%s' % (os.linesep, os.linesep)
+ assert out.getvalue() == expected
+
+ scheduler.add_job(copy, 'date', [datetime(2200, 5, 19)], args=[()])
+ out = StringIO()
+ scheduler.print_jobs(out)
+ expected = 'Jobstore default:%s '\
+ 'copy (trigger: date[2200-05-19 00:00:00 DUMMYTZ], '\
+ 'next run at: 2200-05-19 00:00:00 DUMMYTZ)%s' % (os.linesep, os.linesep)
+ assert out.getvalue() == expected
+
+ def test_jobstore(self, scheduler):
+ scheduler.add_jobstore(MemoryJobStore(), 'dummy')
+ job = scheduler.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy')
+ assert scheduler.get_jobs() == [job]
+ scheduler.remove_jobstore('dummy')
+ assert scheduler.get_jobs() == []
+
+ def test_remove_nonexistent_jobstore(self, scheduler):
+ """Tests that KeyError is raised when trying to remove a job store that doesn't exist."""
+
+ pytest.raises(KeyError, scheduler.remove_jobstore, 'dummy2')
+
+ def test_job_next_run_time(self, scheduler):
+ """Tests against bug #5."""
+
+ vals = [0]
+ job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime},
+ args=(vals,), misfire_grace_time=3)
+
+ scheduler.now = job.next_run_time
+ scheduler._process_jobs()
+ assert vals == [1]
+
+ scheduler._process_jobs()
+ assert vals == [1]
+
+ scheduler.now = job.next_run_time + timedelta(seconds=1)
+ scheduler._process_jobs()
+ assert vals == [2]
+
+ def test_max_instances(self, scheduler):
+ """Tests that the maximum instance limit on a job is respected and that missed job events are dispatched when
+ the job cannot be run due to the instance limitation.
+ """
+
+ def wait_event():
+ vals[0] += 1
+ event.wait(2)
+
+ vals = [0]
+ events = []
+ event = Event()
+ shutdown_event = Event()
+ scheduler._threadpool = ThreadPool()
+ scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
+ scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN)
+ scheduler.add_job(wait_event, 'interval', {'seconds': 1, 'start_date': dummy_datetime}, max_instances=2,
+ max_runs=4)
+ for _ in range(4):
+ scheduler._process_jobs()
+ scheduler.now += timedelta(seconds=1)
+ event.set()
+ scheduler.shutdown()
+ shutdown_event.wait(2)
+
+ assert vals == [2]
+ assert len(events) == 4
+ assert events[0].code == EVENT_JOB_MISSED
+ assert events[1].code == EVENT_JOB_MISSED
+ assert events[2].code == EVENT_JOB_EXECUTED
+ assert events[3].code == EVENT_JOB_EXECUTED
+
+
+@pytest.mark.start_scheduler
+class TestRunningScheduler(object):
+ def test_shutdown_timeout(self, scheduler):
+ scheduler.shutdown()
+
+ def test_scheduler_double_start(self, scheduler):
+ pytest.raises(SchedulerAlreadyRunningError, scheduler.start)
+
+ def test_scheduler_configure_running(self, scheduler):
+ pytest.raises(SchedulerAlreadyRunningError, scheduler.configure, {})
+
+ def test_scheduler_double_shutdown(self, scheduler):
+ scheduler.shutdown()
+ pytest.raises(SchedulerNotRunningError, scheduler.shutdown, False)
+
+
+class SchedulerImplementationTestBase(object):
+ @pytest.fixture
+ def scheduler(self, request):
+ sched = self.create_scheduler()
+ request.addfinalizer(lambda: self.finish(sched))
+ return sched
+
+ def create_scheduler(self):
+ raise NotImplementedError
+
+ def create_event(self):
+ return Event()
+
+ def process_events(self):
+ pass
+
+ def finish(self, sched):
+ pass
+
+ def test_scheduler_implementation(self, scheduler):
+ """Tests that starting the scheduler eventually calls _process_jobs()."""
+
+ class TimeRoller(object):
+ def __init__(self, start, step):
+ self.now = start
+ self.step = timedelta(seconds=step)
+
+ def next(self):
+ return self.now + self.step
+
+ def __call__(self):
+ now = self.now
+ self.now = self.next()
+ return now
+
+ events = []
+ vals = [0]
+ job_removed_event = self.create_event()
+ shutdown_event = self.create_event()
+ scheduler._threadpool = DummyThreadPool()
+
+ # Test that pending jobs are added (and if due, executed) when the scheduler starts
+ scheduler._current_time = time_roller = TimeRoller(dummy_datetime, 0.2)
+ scheduler.add_listener(events.append)
+ scheduler.add_listener(lambda e: job_removed_event.set(), EVENT_JOBSTORE_JOB_REMOVED)
+ scheduler.add_job(increment, 'date', [time_roller.next()], args=(vals,))
+ scheduler.start()
+ self.process_events()
+ job_removed_event.wait(2)
+ assert job_removed_event.is_set()
+ assert vals[0] == 1
+ assert len(events) == 5
+ assert events[0].code == EVENT_JOBSTORE_ADDED
+ assert events[1].code == EVENT_JOBSTORE_JOB_ADDED
+ assert events[2].code == EVENT_SCHEDULER_START
+ assert events[3].code == EVENT_JOB_EXECUTED
+ assert events[4].code == EVENT_JOBSTORE_JOB_REMOVED
+ del events[:]
+ job_removed_event.clear()
+
+ # Test that adding a job causes it to be executed after the specified delay
+ job = scheduler.add_job(increment, 'date', [time_roller.next() + time_roller.step * 2], args=(vals,))
+ self.process_events()
+ sleep(0.5)
+ self.process_events()
+ job_removed_event.wait(2)
+ assert job_removed_event.is_set()
+ assert vals[0] == 2
+ assert len(events) == 3
+ assert events[0].code == EVENT_JOBSTORE_JOB_ADDED
+ assert events[1].code == EVENT_JOB_EXECUTED
+ assert events[2].code == EVENT_JOBSTORE_JOB_REMOVED
+ del events[:]
+ job_removed_event.clear()
+
+ # Test that shutting down the scheduler emits the proper event
+ scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN)
+ scheduler.shutdown()
+ self.process_events()
+ shutdown_event.wait(2)
+ assert shutdown_event.is_set()
+ assert len(events) == 1
+ assert events[0].code == EVENT_SCHEDULER_SHUTDOWN
+
+
+class TestBlockingScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ sched = BlockingScheduler()
+ self.thread = Thread(target=sched.start)
+ sched.start = self.thread.start
+ return sched
+
+ def finish(self, sched):
+ self.thread.join()
+
+
+class TestBackgroundScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ return BackgroundScheduler()
+
+
+class TestAsyncIOScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
+ sched = asyncio.AsyncIOScheduler()
+ self.thread = Thread(target=sched._eventloop.run_forever)
+ self.thread.start()
+ return sched
+
+ def finish(self, sched):
+ sched._eventloop.call_soon_threadsafe(sched._eventloop.stop)
+ self.thread.join()
+
+
+class TestGeventScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ gevent = pytest.importorskip('apscheduler.schedulers.gevent')
+ return gevent.GeventScheduler()
+
+ def create_event(self):
+ from gevent.event import Event
+ return Event()
+
+
+class TestTornadoScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ tornado = pytest.importorskip('apscheduler.schedulers.tornado')
+ sched = tornado.TornadoScheduler()
+ self.thread = Thread(target=sched._ioloop.start)
+ self.thread.start()
+ return sched
+
+ def finish(self, sched):
+ sched._ioloop.add_callback(sched._ioloop.stop)
+ self.thread.join()
+
+
+class TestTwistedScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ twisted = pytest.importorskip('apscheduler.schedulers.twisted')
+ sched = twisted.TwistedScheduler()
+ self.thread = Thread(target=sched._reactor.run, args=(False,))
+ self.thread.start()
+ return sched
+
+ def finish(self, sched):
+ sched._reactor.callFromThread(sched._reactor.stop)
+ self.thread.join()
+
+
+class TestQtScheduler(SchedulerImplementationTestBase):
+ def create_scheduler(self):
+ qt = pytest.importorskip('apscheduler.schedulers.qt')
+ from PySide.QtCore import QCoreApplication
+ QCoreApplication([])
+ return qt.QtScheduler()
+
+ def process_events(self):
+ from PySide.QtCore import QCoreApplication
+ QCoreApplication.processEvents()
diff --git a/tests/test_util.py b/tests/test_util.py
index 64ca00a..e081ec8 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -1,16 +1,15 @@
# coding: utf-8
-from datetime import date, datetime, timedelta
+from datetime import date, datetime, timedelta, tzinfo
import time
import os
import shelve
-from dateutil.tz import tzoffset
+from dateutil.tz import tzoffset, gettz
import pytest
from apscheduler.util import *
from tests.conftest import minpython
-
local_tz = tzoffset('DUMMYTZ', 3600)
@@ -35,10 +34,6 @@ class DummyClass(object):
pass
-def meth():
- pass
-
-
def test_asint_invalid_1():
pytest.raises(ValueError, asint, '5s')
@@ -71,6 +66,25 @@ def test_asbool_fail():
pytest.raises(ValueError, asbool, 'yep')
+def test_astimezone_str():
+ value = astimezone('Europe/Helsinki')
+ assert isinstance(value, tzinfo)
+
+
+def test_astimezone_tz():
+ tz = gettz('Europe/Helsinki')
+ value = astimezone(tz)
+ assert tz is value
+
+
+def test_astimezone_none():
+ assert astimezone(None) is None
+
+
+def test_astimezone_fail():
+ pytest.raises(TypeError, astimezone, 4)
+
+
def test_convert_datetime_date():
dateval = date(2009, 8, 1)
datetimeval = convert_to_datetime(dateval, local_tz, None)
diff --git a/tox.ini b/tox.ini
index 995bd2e..36f2c2c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,19 +1,39 @@
[tox]
-envlist = py26,py27,py32,py33,jython,pypy,pep8
+envlist = py26,py27,py32,py33,py34,pypy,pep8
-[testenv]
+[base]
deps=pytest
pytest-cov
sqlalchemy
pymongo
redis
+
+[testenv]
+deps={[base]deps}
+ twisted
+ gevent
+ tornado
commands=py.test []
-# Jython does not have SQLite, so don't download SQLAlchemy.
-[testenv:jython]
-deps=nose
- pymongo
- redis
+[testenv:py32]
+deps={[testenv:py34]deps}
+ cython
+ https://github.com/fantix/gevent/archive/master.zip
+
+[testenv:py33]
+deps={[testenv:py34]deps}
+ asyncio
+ cython
+ https://github.com/fantix/gevent/archive/master.zip
+
+[testenv:py34]
+deps={[base]deps}
+ tornado
+
+[testenv:pypy]
+deps={[base]deps}
+ tornado
+ twisted
[testenv:pep8]
deps=pep8