diff options
30 files changed, 1352 insertions, 795 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index fbbc417..f5090b8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -7,6 +7,11 @@ APScheduler, see the :doc:`migration section <migration>`. 3.0.0 ----- +* Split the old Scheduler class into BlockingScheduler and BackgroundScheduler and added integration for + asyncio (PEP 3156), Gevent, Tornado, Twisted and Qt event loops + +* Added a hook for customizing reading of current time + * Added support for timezones * Removed the shortcuts to built-in triggers in the scheduler API so they no longer get preferential treatment @@ -15,17 +15,25 @@ provides features not present in Quartz (such as multiple job stores). Features ======== -* No (hard) external dependencies, except for setuptools/distribute * Thread-safe API -* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.3, Jython 2.5.3, PyPy 1.9) +* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.4, PyPy 2.2) * Configurable scheduling mechanisms (triggers): * Cron-like scheduling * Delayed scheduling of single run jobs (like the UNIX "at" command) * Interval-based (run a job at specified time intervals) +* Integrates with several frameworks: + + * `asyncio <http://docs.python.org/3.4/library/asyncio.html>`_ + (`PEP 3156 <http://www.python.org/dev/peps/pep-3156/>`_) + * `gevent <http://www.gevent.org/>`_ + * `Tornado <http://www.tornadoweb.org/>`_ + * `Twisted <http://twistedmatrix.com/>`_ + * `Qt <http://qt-project.org/>`_ (using either `PyQt <http://www.riverbankcomputing.com/software/pyqt/intro>`_ + or `PySide <http://qt-project.org/wiki/PySide>`_) * Multiple, simultaneously active job stores: - * RAM + * Memory * File-based simple database (shelve) * `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any supported RDBMS works) * `MongoDB <http://www.mongodb.org/>`_ @@ -41,14 +49,13 @@ Documentation can be found `here <http://readthedocs.org/docs/apscheduler/en/lat Source ====== -The source can be browsed at `Bitbucket -<http://bitbucket.org/agronholm/apscheduler/src/>`_. +The source can be browsed at `Bitbucket <http://bitbucket.org/agronholm/apscheduler/src/>`_. Reporting bugs ============== -A `bug tracker <http://bitbucket.org/agronholm/apscheduler/issues/>`_ +A `bug tracker <https://bitbucket.org/agronholm/apscheduler/issues?status=new&status=open>`_ is provided by bitbucket.org. @@ -57,7 +64,5 @@ Getting help If you have problems or other questions, you can either: -* Ask on the `APScheduler Google group - <http://groups.google.com/group/apscheduler>`_, or -* Ask on the ``#apscheduler`` channel on - `Freenode IRC <http://freenode.net/irc_servers.shtml>`_ +* Ask on the `APScheduler Google group <http://groups.google.com/group/apscheduler>`_, or +* Ask on the ``#apscheduler`` channel on `Freenode IRC <http://freenode.net/irc_servers.shtml>`_ diff --git a/apscheduler/__init__.py b/apscheduler/__init__.py index d5b383d..4d7ecb0 100644 --- a/apscheduler/__init__.py +++ b/apscheduler/__init__.py @@ -1,3 +1,3 @@ -version_info = (3, 0, 0, 'dev1') +version_info = (3, 0, 0, 'pre1') version = '.'.join(str(n) for n in version_info[:3]) release = '.'.join(str(n) for n in version_info) diff --git a/apscheduler/schedulers/__init__.py b/apscheduler/schedulers/__init__.py new file mode 100644 index 0000000..bd8a790 --- /dev/null +++ b/apscheduler/schedulers/__init__.py @@ -0,0 +1,12 @@ +class SchedulerAlreadyRunningError(Exception): + """Raised when attempting to start or configure the scheduler when it's already running.""" + + def __str__(self): + return 'Scheduler is already running' + + +class SchedulerNotRunningError(Exception): + """Raised when attempting to shutdown the scheduler when it's not running.""" + + def __str__(self): + return 'Scheduler is not running' diff --git a/apscheduler/schedulers/asyncio.py b/apscheduler/schedulers/asyncio.py new file mode 100644 index 0000000..8bd35ee --- /dev/null +++ b/apscheduler/schedulers/asyncio.py @@ -0,0 +1,53 @@ +from __future__ import absolute_import +from functools import wraps + +from apscheduler.schedulers.base import BaseScheduler +from apscheduler.util import maybe_ref + +try: + import asyncio +except ImportError: # pragma: nocover + raise ImportError('AsyncIOScheduler requires either Python 3.4 or the asyncio package installed') + + +def run_in_event_loop(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs) + return wrapper + + +class AsyncIOScheduler(BaseScheduler): + """A scheduler that runs on an asyncio (PEP 3156) event loop.""" + + _eventloop = None + _timeout = None + + def start(self): + super(AsyncIOScheduler, self).start() + self._wakeup() + + @run_in_event_loop + def shutdown(self, wait=True): + super(AsyncIOScheduler, self).shutdown(wait) + self._stop_timer() + + def _configure(self, config): + self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop() + super(AsyncIOScheduler, self)._configure(config) + + def _start_timer(self, wait_seconds): + self._stop_timer() + if wait_seconds is not None: + self._timeout = self._eventloop.call_later(wait_seconds, self._wakeup) + + def _stop_timer(self): + if self._timeout: + self._timeout.cancel() + del self._timeout + + @run_in_event_loop + def _wakeup(self): + self._stop_timer() + wait_seconds = self._process_jobs() + self._start_timer(wait_seconds) diff --git a/apscheduler/schedulers/background.py b/apscheduler/schedulers/background.py new file mode 100644 index 0000000..c6fdff3 --- /dev/null +++ b/apscheduler/schedulers/background.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import +from threading import Thread, Event + +from apscheduler.schedulers.base import BaseScheduler +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.util import asbool + + +class BackgroundScheduler(BlockingScheduler): + """A scheduler that runs in the background using a separate thread.""" + + _thread = None + + def _configure(self, config): + self._daemon = asbool(config.pop('daemon', True)) + super(BackgroundScheduler, self)._configure(config) + + def start(self): + BaseScheduler.start(self) + self._event = Event() + self._thread = Thread(target=self._main_loop, name='APScheduler') + self._thread.daemon = self._daemon + self._thread.start() + + def shutdown(self, wait=True): + super(BackgroundScheduler, self).shutdown(wait) + self._thread.join() + del self._thread diff --git a/apscheduler/scheduler.py b/apscheduler/schedulers/base.py index 1be0162..c4a6af0 100644 --- a/apscheduler/scheduler.py +++ b/apscheduler/schedulers/base.py @@ -1,7 +1,8 @@ """ This module is the main part of the library. It houses the Scheduler class and related exceptions. """ -from threading import Thread, Event, Lock +from abc import ABCMeta, abstractmethod +from threading import Lock from datetime import datetime, timedelta from logging import getLogger from collections import Mapping, Iterable @@ -10,9 +11,11 @@ import os import sys from pkg_resources import iter_entry_points -from dateutil.tz import gettz, tzlocal -from six import string_types, u, itervalues, iteritems +from dateutil.tz import tzlocal +from six import u, itervalues, iteritems +import six +from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError from apscheduler.util import * from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.job import Job, MaxInstancesReachedError @@ -24,33 +27,23 @@ try: except ImportError: from inspect import getargspec -logger = getLogger(__name__) +class BaseScheduler(six.with_metaclass(ABCMeta)): + """Base class for all schedulers.""" -class SchedulerAlreadyRunningError(Exception): - """ - Raised when attempting to start or configure the scheduler when it's already running. - """ - - def __str__(self): - return 'Scheduler is already running' - - -class Scheduler(object): - """ - This class is responsible for scheduling jobs and triggering their execution. - """ - - _stopped = False - _thread = None + _stopped = True _plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers')) + # + # Public API + # + def __init__(self, gconfig={}, **options): - self._wakeup = Event() + super(BaseScheduler, self).__init__() self._jobstores = {} - self._jobstores_lock = Lock() + self._jobstores_lock = self._create_lock() self._listeners = [] - self._listeners_lock = Lock() + self._listeners_lock = self._create_lock() self._pending_jobs = [] self._triggers = {} self.configure(gconfig, **options) @@ -62,47 +55,13 @@ class Scheduler(object): if self.running: raise SchedulerAlreadyRunningError - # Set general options config = combine_opts(gconfig, 'apscheduler.', options) - self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) - self.coalesce = asbool(config.pop('coalesce', True)) - self.daemonic = asbool(config.pop('daemonic', True)) - self.standalone = asbool(config.pop('standalone', False)) - timezone = config.pop('timezone', None) - self.timezone = gettz(timezone) if isinstance(timezone, string_types) else timezone or tzlocal() - - # Set trigger defaults - self.trigger_defaults = {'timezone': self.timezone} - - # Configure the thread pool - if 'threadpool' in config: - self._threadpool = maybe_ref(config['threadpool']) - else: - threadpool_opts = combine_opts(config, 'threadpool.') - self._threadpool = ThreadPool(**threadpool_opts) - - # Configure job stores - jobstore_opts = combine_opts(config, 'jobstore.') - jobstores = {} - for key, value in jobstore_opts.items(): - store_name, option = key.split('.', 1) - opts_dict = jobstores.setdefault(store_name, {}) - opts_dict[option] = value - - for alias, opts in jobstores.items(): - classname = opts.pop('class') - cls = maybe_ref(classname) - jobstore = cls(**opts) - self.add_jobstore(jobstore, alias, True) + self._configure(config) + @abstractmethod def start(self): - """ - Starts the scheduler in a new thread. + """Starts the scheduler. The details of this process depend on the implementation.""" - In threaded mode (the default), this method will return immediately after starting the scheduler thread. - - In standalone mode, this method will block until there are no more scheduled jobs. - """ if self.running: raise SchedulerAlreadyRunningError @@ -116,44 +75,36 @@ class Scheduler(object): del self._pending_jobs[:] self._stopped = False - if self.standalone: - self._main_loop() - else: - self._thread = Thread(target=self._main_loop, name='APScheduler') - self._thread.setDaemon(self.daemonic) - self._thread.start() + self.logger.info('Scheduler started') + + # Notify listeners that the scheduler has been started + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) - def shutdown(self, wait=True, shutdown_threadpool=True, close_jobstores=True): + @abstractmethod + def shutdown(self, wait=True): """ - Shuts down the scheduler and terminates the thread. Does not interrupt any currently running jobs. + Shuts down the scheduler. Does not interrupt any currently running jobs. - :param wait: ``True`` to wait until all currently executing jobs have finished (if ``shutdown_threadpool`` is - also ``True``) - :param shutdown_threadpool: ``True`` to shut down the thread pool - :param close_jobstores: ``True`` to close all job stores after shutdown + :param wait: ``True`` to wait until all currently executing jobs have finished """ if not self.running: - return + raise SchedulerNotRunningError self._stopped = True - self._wakeup.set() # Shut down the thread pool - if shutdown_threadpool: - self._threadpool.shutdown(wait) - - # Wait until the scheduler thread terminates - if self._thread: - self._thread.join() + self._threadpool.shutdown(wait) # Close all job stores - if close_jobstores: - for jobstore in itervalues(self._jobstores): - jobstore.close() + for jobstore in itervalues(self._jobstores): + jobstore.close() + + self.logger.info('Scheduler has been shut down') + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) @property def running(self): - return not self._stopped and self._thread and self._thread.isAlive() + return not self._stopped def add_jobstore(self, jobstore, alias, quiet=False): """ @@ -175,8 +126,8 @@ class Scheduler(object): self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) # Notify the scheduler so it can scan the new job store for jobs - if not quiet: - self._wakeup.set() + if not quiet and self.running: + self._wakeup() def remove_jobstore(self, alias, close=True): """ @@ -218,80 +169,6 @@ class Scheduler(object): if callback == cb: del self._listeners[i] - def _notify_listeners(self, event): - with self._listeners_lock: - listeners = tuple(self._listeners) - - for cb, mask in listeners: - if event.code & mask: - try: - cb(event) - except: - logger.exception('Error notifying listener') - - def _real_add_job(self, job, jobstore, wakeup): - # Recalculate the next run time - job.compute_next_run_time(datetime.now(self.timezone)) - - # Add the job to the given job store - store = self._jobstores.get(jobstore) - if not store: - raise KeyError('No such job store: %s' % jobstore) - store.add_job(job) - - # Notify listeners that a new job has been added - event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job) - self._notify_listeners(event) - - logger.info('Added job "%s" to job store "%s"', job, jobstore) - - # Notify the scheduler about the new job - if wakeup: - self._wakeup.set() - - @staticmethod - def _check_callable_args(func, args, kwargs): - if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'): - func = func.__call__ - argspec = getargspec(func) - argspec_args = argspec.args[1:] if ismethod(func) else argspec.args - varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None) - kwargs_set = frozenset(kwargs) - mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args) - mandatory_args_matches = frozenset(argspec_args[:len(args)]) - mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args) - kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', [])) - kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ()) - - # Make sure there are no conflicts between args and kwargs - pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches) - if pos_kwargs_conflicts: - raise ValueError('The following arguments are supplied in both args and kwargs: %s' % - ', '.join(pos_kwargs_conflicts)) - - # Check that the number of positional arguments minus the number of matched kwargs matches the argspec - missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches) - if missing_args: - raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args)) - - # Check that the callable can accept the given number of positional arguments - if not argspec.varargs and len(args) > len(argspec_args): - raise ValueError('The list of positional arguments is longer than the target callable can handle ' - '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args))) - - # Check that the callable can accept the given keyword arguments - if not varkw: - unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args) - if unmatched_kwargs: - raise ValueError('The target callable does not accept the following keyword arguments: %s' % - ', '.join(unmatched_kwargs)) - - # Check that all keyword-only arguments have been supplied - unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults - if unmatched_kwargs: - raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' % - ', '.join(unmatched_kwargs)) - def add_job(self, func, trigger, trigger_args=(), args=None, kwargs=None, misfire_grace_time=None, coalesce=None, name=None, max_runs=None, max_instances=1, jobstore='default'): """ @@ -367,13 +244,13 @@ class Scheduler(object): self._check_callable_args(job.func, args, kwargs) # Ensure that dead-on-arrival jobs are never added - if job.compute_next_run_time(datetime.now(self.timezone)) is None: + if job.compute_next_run_time(self._current_time()) is None: raise ValueError('Not adding job since it would never be run') # Don't really add jobs to job stores before the scheduler is up and running if not self.running: self._pending_jobs.append((job, jobstore)) - logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts') + self.logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts') else: self._real_add_job(job, jobstore, True) @@ -388,15 +265,6 @@ class Scheduler(object): return func return inner - def _remove_job(self, job, alias, jobstore): - jobstore.remove_job(job) - - # Notify listeners that a job has been removed - event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job) - self._notify_listeners(event) - - logger.info('Removed job "%s"', job) - def get_jobs(self): """ Returns a list of all scheduled jobs. @@ -456,31 +324,161 @@ class Scheduler(object): out.write(os.linesep.join(job_strs) + os.linesep) + # + # Protected API + # + + def _configure(self, config): + # Set general options + self.logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler') + self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) + self.coalesce = asbool(config.pop('coalesce', True)) + self.timezone = astimezone(config.pop('timezone', None)) or tzlocal() + + # Set trigger defaults + self.trigger_defaults = {'timezone': self.timezone} + + # Configure the thread pool + if 'threadpool' in config: + self._threadpool = maybe_ref(config['threadpool']) + else: + threadpool_opts = combine_opts(config, 'threadpool.') + self._threadpool = ThreadPool(**threadpool_opts) + + # Configure job stores + jobstore_opts = combine_opts(config, 'jobstore.') + jobstores = {} + for key, value in jobstore_opts.items(): + store_name, option = key.split('.', 1) + opts_dict = jobstores.setdefault(store_name, {}) + opts_dict[option] = value + + for alias, opts in jobstores.items(): + classname = opts.pop('class') + cls = maybe_ref(classname) + jobstore = cls(**opts) + self.add_jobstore(jobstore, alias, True) + + def _notify_listeners(self, event): + with self._listeners_lock: + listeners = tuple(self._listeners) + + for cb, mask in listeners: + if event.code & mask: + try: + cb(event) + except: + self.logger.exception('Error notifying listener') + + def _real_add_job(self, job, jobstore, wakeup): + # Recalculate the next run time + job.compute_next_run_time(self._current_time()) + + # Add the job to the given job store + store = self._jobstores.get(jobstore) + if not store: + raise KeyError('No such job store: %s' % jobstore) + store.add_job(job) + + # Notify listeners that a new job has been added + event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job) + self._notify_listeners(event) + + self.logger.info('Added job "%s" to job store "%s"', job, jobstore) + + # Notify the scheduler about the new job + if wakeup: + self._wakeup() + + def _remove_job(self, job, alias, jobstore): + jobstore.remove_job(job) + + # Notify listeners that a job has been removed + event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job) + self._notify_listeners(event) + + self.logger.info('Removed job "%s"', job) + + @staticmethod + def _check_callable_args(func, args, kwargs): + """Ensures that the given callable can be called with the given arguments.""" + + if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'): + func = func.__call__ + argspec = getargspec(func) + argspec_args = argspec.args[1:] if ismethod(func) else argspec.args + varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None) + kwargs_set = frozenset(kwargs) + mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args) + mandatory_args_matches = frozenset(argspec_args[:len(args)]) + mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args) + kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', [])) + kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ()) + + # Make sure there are no conflicts between args and kwargs + pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches) + if pos_kwargs_conflicts: + raise ValueError('The following arguments are supplied in both args and kwargs: %s' % + ', '.join(pos_kwargs_conflicts)) + + # Check that the number of positional arguments minus the number of matched kwargs matches the argspec + missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches) + if missing_args: + raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args)) + + # Check that the callable can accept the given number of positional arguments + if not argspec.varargs and len(args) > len(argspec_args): + raise ValueError('The list of positional arguments is longer than the target callable can handle ' + '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args))) + + # Check that the callable can accept the given keyword arguments + if not varkw: + unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args) + if unmatched_kwargs: + raise ValueError('The target callable does not accept the following keyword arguments: %s' % + ', '.join(unmatched_kwargs)) + + # Check that all keyword-only arguments have been supplied + unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults + if unmatched_kwargs: + raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' % + ', '.join(unmatched_kwargs)) + + @abstractmethod + def _wakeup(self): + """Triggers :meth:`_process_jobs` to be run in an implementation specific manner.""" + + def _create_lock(self): + return Lock() + + def _current_time(self): + return datetime.now(self.timezone) + def _run_job(self, job, run_times): - """ - Acts as a harness that runs the actual job code in a thread. - """ + """Acts as a harness that runs the actual job code in the thread pool.""" + for run_time in run_times: # See if the job missed its run time window, and handle possible # misfires accordingly - difference = datetime.now(self.timezone) - run_time + difference = self._current_time() - run_time grace_time = timedelta(seconds=job.misfire_grace_time) if difference > grace_time: # Notify listeners about a missed run event = JobEvent(EVENT_JOB_MISSED, job, run_time) self._notify_listeners(event) - logger.warning('Run time of job "%s" was missed by %s', job, difference) + self.logger.warning('Run time of job "%s" was missed by %s', job, difference) else: try: job.add_instance() except MaxInstancesReachedError: event = JobEvent(EVENT_JOB_MISSED, job, run_time) self._notify_listeners(event) - logger.warning('Execution of job "%s" skipped: maximum number of running instances reached (%d)', - job, job.max_instances) + self.logger.warning( + 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', job, + job.max_instances) break - logger.info('Running job "%s" (scheduled at %s)', job, run_time) + self.logger.info('Running job "%s" (scheduled at %s)', job, run_time) try: retval = job.func(*job.args, **job.kwargs) @@ -490,13 +488,13 @@ class Scheduler(object): event = JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb) self._notify_listeners(event) - logger.exception('Job "%s" raised an exception', job) + self.logger.exception('Job "%s" raised an exception', job) else: # Notify listeners about successful execution event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval) self._notify_listeners(event) - logger.info('Job "%s" executed successfully', job) + self.logger.info('Job "%s" executed successfully', job) job.remove_instance() @@ -504,11 +502,15 @@ class Scheduler(object): if job.coalesce: break - def _process_jobs(self, now): - """ - Iterates through jobs in every jobstore, starts pending jobs and figures out the next wakeup time. + def _process_jobs(self): + """Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for + the next round. """ + + self.logger.debug('Looking for jobs to run') + now = self._current_time() next_wakeup_time = None + with self._jobstores_lock: for alias, jobstore in iteritems(self._jobstores): for job in tuple(jobstore.jobs): @@ -517,10 +519,7 @@ class Scheduler(object): self._threadpool.submit(self._run_job, job, run_times) # Increase the job's run count - if job.coalesce: - job.runs += 1 - else: - job.runs += len(run_times) + job.runs += 1 if job.coalesce else len(run_times) # Update the job, but don't keep finished jobs around if job.compute_next_run_time(now + timedelta(microseconds=1)): @@ -532,35 +531,13 @@ class Scheduler(object): next_wakeup_time = job.next_run_time elif job.next_run_time: next_wakeup_time = min(next_wakeup_time, job.next_run_time) - return next_wakeup_time - - def _main_loop(self): - """Executes jobs on schedule.""" - - logger.info('Scheduler started') - self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) - self._wakeup.clear() - while not self._stopped: - logger.debug('Looking for jobs to run') - now = datetime.now(self.timezone) - next_wakeup_time = self._process_jobs(now) - - # Sleep until the next job is scheduled to be run, - # a new job is added or the scheduler is stopped - if next_wakeup_time is not None: - wait_seconds = time_difference(next_wakeup_time, now) - logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) - self._wakeup.wait(wait_seconds) - self._wakeup.clear() - elif self.standalone: - logger.debug('No jobs left; shutting down scheduler') - self.shutdown() - break - else: - logger.debug('No jobs; waiting until a job is added') - self._wakeup.wait() - self._wakeup.clear() + # Determine the delay until this method should be called again + if next_wakeup_time is not None: + wait_seconds = time_difference(next_wakeup_time, now) + self.logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) + else: + wait_seconds = None + self.logger.debug('No jobs; waiting until a job is added') - logger.info('Scheduler has been shut down') - self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) + return wait_seconds diff --git a/apscheduler/schedulers/blocking.py b/apscheduler/schedulers/blocking.py new file mode 100644 index 0000000..a910ccf --- /dev/null +++ b/apscheduler/schedulers/blocking.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import +from threading import Event + +from apscheduler.schedulers.base import BaseScheduler + + +class BlockingScheduler(BaseScheduler): + """A scheduler that runs in the foreground. Calling :meth:`start` will block.""" + + _event = None + + def start(self): + super(BlockingScheduler, self).start() + self._event = Event() + self._main_loop() + + def shutdown(self, wait=True): + super(BlockingScheduler, self).shutdown(wait) + self._event.set() + + def _main_loop(self): + while self.running: + wait_seconds = self._process_jobs() + self._event.wait(wait_seconds) + self._event.clear() + + def _wakeup(self): + self._event.set() diff --git a/apscheduler/schedulers/gevent.py b/apscheduler/schedulers/gevent.py new file mode 100644 index 0000000..5962708 --- /dev/null +++ b/apscheduler/schedulers/gevent.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import + +from gevent.lock import RLock + +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.schedulers.base import BaseScheduler + +try: + from gevent.event import Event + import gevent +except ImportError: # pragma: nocover + raise ImportError('GeventScheduler requires gevent installed') + + +class GeventScheduler(BlockingScheduler): + """A scheduler that runs as a Gevent greenlet.""" + + _greenlet = None + + def start(self): + BaseScheduler.start(self) + self._event = Event() + self._greenlet = gevent.spawn(self._main_loop) + return self._greenlet + + def shutdown(self, wait=True): + super(GeventScheduler, self).shutdown(wait) + self._greenlet.join() + del self._greenlet + + def _create_lock(self): + return RLock() diff --git a/apscheduler/schedulers/qt.py b/apscheduler/schedulers/qt.py new file mode 100644 index 0000000..61174ab --- /dev/null +++ b/apscheduler/schedulers/qt.py @@ -0,0 +1,46 @@ +from __future__ import absolute_import + +from apscheduler.schedulers.base import BaseScheduler + +try: + from PyQt5.QtCore import QObject, QTimer +except ImportError: # pragma: nocover + try: + from PyQt4.QtCore import QObject, QTimer + except ImportError: + try: + from PySide.QtCore import QObject, QTimer + except ImportError: + raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed') + + +class QtScheduler(BaseScheduler): + """A scheduler that runs in a Qt event loop.""" + + _timer = None + + def start(self): + super(QtScheduler, self).start() + self._wakeup() + + def shutdown(self, wait=True): + super(QtScheduler, self).shutdown(wait) + self._stop_timer() + + def _start_timer(self, wait_seconds): + self._stop_timer() + if wait_seconds is not None: + self._timer = QTimer.singleShot(wait_seconds * 1000, self._process_jobs) + + def _stop_timer(self): + if self._timer: + if self._timer.isActive(): + self._timer.stop() + del self._timer + + def _wakeup(self): + self._start_timer(0) + + def _process_jobs(self): + wait_seconds = super(QtScheduler, self)._process_jobs() + self._start_timer(wait_seconds) diff --git a/apscheduler/schedulers/tornado.py b/apscheduler/schedulers/tornado.py new file mode 100644 index 0000000..f241371 --- /dev/null +++ b/apscheduler/schedulers/tornado.py @@ -0,0 +1,54 @@ +from __future__ import absolute_import +from datetime import timedelta +from functools import wraps + +from apscheduler.schedulers.base import BaseScheduler +from apscheduler.util import maybe_ref + +try: + from tornado.ioloop import IOLoop +except ImportError: # pragma: nocover + raise ImportError('TornadoScheduler requires tornado installed') + + +def run_in_ioloop(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + self._ioloop.add_callback(func, self, *args, **kwargs) + return wrapper + + +class TornadoScheduler(BaseScheduler): + """A scheduler that runs on a Tornado IOLoop.""" + + _ioloop = None + _timeout = None + + def start(self): + super(TornadoScheduler, self).start() + self._wakeup() + + @run_in_ioloop + def shutdown(self, wait=True): + super(TornadoScheduler, self).shutdown(wait) + self._stop_timer() + + def _configure(self, config): + self._ioloop = maybe_ref(config.pop('io_loop', None)) or IOLoop.current() + super(TornadoScheduler, self)._configure(config) + + def _start_timer(self, wait_seconds): + self._stop_timer() + if wait_seconds is not None: + self._timeout = self._ioloop.add_timeout(timedelta(seconds=wait_seconds), self._wakeup) + + def _stop_timer(self): + if self._timeout: + self._ioloop.remove_timeout(self._timeout) + del self._timeout + + @run_in_ioloop + def _wakeup(self): + self._stop_timer() + wait_seconds = self._process_jobs() + self._start_timer(wait_seconds) diff --git a/apscheduler/schedulers/twisted.py b/apscheduler/schedulers/twisted.py new file mode 100644 index 0000000..2c1e8fb --- /dev/null +++ b/apscheduler/schedulers/twisted.py @@ -0,0 +1,53 @@ +from __future__ import absolute_import +from functools import wraps + +from apscheduler.schedulers.base import BaseScheduler +from apscheduler.util import maybe_ref + +try: + from twisted.internet import reactor as default_reactor +except ImportError: # pragma: nocover + raise ImportError('TwistedScheduler requires Twisted installed') + + +def run_in_reactor(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + self._reactor.callFromThread(func, self, *args, **kwargs) + return wrapper + + +class TwistedScheduler(BaseScheduler): + """A scheduler that runs on a Twisted reactor.""" + + _reactor = None + _delayedcall = None + + def _configure(self, config): + self._reactor = maybe_ref(config.pop('reactor', default_reactor)) + super(TwistedScheduler, self)._configure(config) + + def start(self): + super(TwistedScheduler, self).start() + self._wakeup() + + @run_in_reactor + def shutdown(self, wait=True): + super(TwistedScheduler, self).shutdown(wait) + self._stop_timer() + + def _start_timer(self, wait_seconds): + self._stop_timer() + if wait_seconds is not None: + self._delayedcall = self._reactor.callLater(wait_seconds, self._wakeup) + + def _stop_timer(self): + if self._delayedcall and self._delayedcall.active(): + self._delayedcall.cancel() + del self._delayedcall + + @run_in_reactor + def _wakeup(self): + self._stop_timer() + wait_seconds = self._process_jobs() + self._start_timer(wait_seconds) diff --git a/apscheduler/util.py b/apscheduler/util.py index b9b5c2f..163288e 100644 --- a/apscheduler/util.py +++ b/apscheduler/util.py @@ -2,15 +2,15 @@ This module contains several handy functions primarily meant for internal use. """ -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, tzinfo from time import mktime import re from dateutil.tz import gettz from six import string_types -__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds', 'time_difference', 'datetime_ceil', - 'combine_opts', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref') +__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'timedelta_seconds', 'time_difference', + 'datetime_ceil', 'combine_opts', 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref') def asint(text): @@ -40,6 +40,21 @@ def asbool(obj): return bool(obj) +def astimezone(obj): + """ + Interprets an object as a timezone. + + :rtype: :class:`~datetime.tzinfo` + """ + + if isinstance(obj, string_types): + return gettz(obj) + if isinstance(obj, tzinfo): + return obj + if obj is not None: + raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__) + + _DATE_REGEX = re.compile( r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})' r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})' diff --git a/docs/index.rst b/docs/index.rst index e8c09b3..b2a0c23 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,18 +23,26 @@ provides features not present in Quartz (such as multiple job stores). Features -------- -* No (hard) external dependencies, except for setuptools/distribute * Thread-safe API -* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.3, Jython 2.5.3, PyPy 1.9) +* Excellent test coverage (tested on CPython 2.6 - 2.7, 3.2 - 3.4, PyPy 2.2) * Configurable scheduling mechanisms (triggers): * Cron-like scheduling * Delayed scheduling of single run jobs (like the UNIX "at" command) * Interval-based (run a job at specified time intervals) +* Integrates with several frameworks: + + * `asyncio <http://docs.python.org/3.4/library/asyncio.html>`_ + (`PEP 3156 <http://www.python.org/dev/peps/pep-3156/>`_) + * `gevent <http://www.gevent.org/>`_ + * `Tornado <http://www.tornadoweb.org/>`_ + * `Twisted <http://twistedmatrix.com/>`_ + * `Qt <http://qt-project.org/>`_ (using either `PyQt <http://www.riverbankcomputing.com/software/pyqt/intro>`_ + or `PySide <http://qt-project.org/wiki/PySide>`_) * Multiple, simultaneously active job stores: - * RAM - * File-based simple database (:py:mod:`shelve`) + * Memory + * File-based simple database (shelve) * `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any supported RDBMS works) * `MongoDB <http://www.mongodb.org/>`_ * `Redis <http://redis.io/>`_ diff --git a/examples/persistent.py b/examples/persistent.py index b75b778..1e1a155 100644 --- a/examples/persistent.py +++ b/examples/persistent.py @@ -6,8 +6,8 @@ You can exit the program, restart it and observe that any previous alarms that h from datetime import datetime, timedelta -from apscheduler.scheduler import Scheduler -from apscheduler.jobstores.shelve_store import ShelveJobStore +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.jobstores.shelve import ShelveJobStore def alarm(time): @@ -15,12 +15,13 @@ def alarm(time): if __name__ == '__main__': - scheduler = Scheduler(standalone=True) + scheduler = BlockingScheduler() scheduler.add_jobstore(ShelveJobStore('example.db'), 'shelve') alarm_time = datetime.now() + timedelta(seconds=10) scheduler.add_job(alarm, 'simple', [alarm_time], jobstore='shelve', args=[datetime.now()]) print('To clear the alarms, delete the example.db file.') print('Press Ctrl+C to exit') + try: scheduler.start() except (KeyboardInterrupt, SystemExit): diff --git a/examples/reference.py b/examples/reference.py index dc9756c..2b0c414 100644 --- a/examples/reference.py +++ b/examples/reference.py @@ -2,13 +2,14 @@ Basic example showing how to schedule a callable using a textual reference. """ -from apscheduler.scheduler import Scheduler +from apscheduler.schedulers.blocking import BlockingScheduler if __name__ == '__main__': - scheduler = Scheduler(standalone=True) + scheduler = BlockingScheduler() scheduler.add_job('sys:stdout.write', 'interval', {'seconds': 3}, args=['tick\n']) print('Press Ctrl+C to exit') + try: scheduler.start() except (KeyboardInterrupt, SystemExit): diff --git a/examples/schedulers/asyncio_.py b/examples/schedulers/asyncio_.py new file mode 100644 index 0000000..01cc138 --- /dev/null +++ b/examples/schedulers/asyncio_.py @@ -0,0 +1,25 @@ +""" +Demonstrates how to use the Tornado compatible scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime +import asyncio + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = AsyncIOScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.start() + print('Press Ctrl+C to exit') + + # Execution will block here until Ctrl+C is pressed. + try: + asyncio.get_event_loop().run_forever() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/examples/schedulers/background.py b/examples/schedulers/background.py new file mode 100644 index 0000000..8a3dde5 --- /dev/null +++ b/examples/schedulers/background.py @@ -0,0 +1,26 @@ +""" +Demonstrates how to use the background scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime +import time + +from apscheduler.schedulers.background import BackgroundScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = BackgroundScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.start() + print('Press Ctrl+C to exit') + + try: + # This is here to simulate application activity (which keeps the main thread alive). + while True: + time.sleep(2) + except (KeyboardInterrupt, SystemExit): + scheduler.shutdown() # Not strictly necessary if daemonic mode is enabled but should be done if possible diff --git a/examples/interval.py b/examples/schedulers/blocking.py index 5469fef..55d1385 100644 --- a/examples/interval.py +++ b/examples/schedulers/blocking.py @@ -1,10 +1,10 @@ """ -Basic example showing how to start the scheduler and schedule a job that executes on 3 second intervals. +Demonstrates how to use the blocking scheduler to schedule a job that executes on 3 second intervals. """ from datetime import datetime -from apscheduler.scheduler import Scheduler +from apscheduler.schedulers.blocking import BlockingScheduler def tick(): @@ -12,9 +12,10 @@ def tick(): if __name__ == '__main__': - scheduler = Scheduler(standalone=True) + scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval', {'seconds': 3}) print('Press Ctrl+C to exit') + try: scheduler.start() except (KeyboardInterrupt, SystemExit): diff --git a/examples/schedulers/gevent_.py b/examples/schedulers/gevent_.py new file mode 100644 index 0000000..597aa4d --- /dev/null +++ b/examples/schedulers/gevent_.py @@ -0,0 +1,24 @@ +""" +Demonstrates how to use the gevent compatible scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime + +from apscheduler.schedulers.gevent import GeventScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = GeventScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + g = scheduler.start() # g is the greenlet that runs the scheduler loop + print('Press Ctrl+C to exit') + + # Execution will block here until Ctrl+C is pressed. + try: + g.join() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/examples/schedulers/qt.py b/examples/schedulers/qt.py new file mode 100644 index 0000000..15b9192 --- /dev/null +++ b/examples/schedulers/qt.py @@ -0,0 +1,37 @@ +""" +Demonstrates how to use the Qt compatible scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime +import signal +import sys + +from apscheduler.schedulers.qt import QtScheduler + +try: + from PyQt5.QtWidgets import QApplication, QLabel +except ImportError: + try: + from PyQt4.QtGui import QApplication, QLabel + except ImportError: + from PySide.QtGui import QApplication, QLabel + + +def tick(): + label.setText('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + app = QApplication(sys.argv) + signal.signal(signal.SIGINT, lambda *args: QApplication.quit()) # This enables processing of Ctrl+C keypresses + label = QLabel('The timer text will appear here in a moment!') + label.setWindowTitle('QtScheduler example') + label.setFixedSize(280, 50) + label.show() + + scheduler = QtScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.start() + + # Execution will block here until the user closes the windows or Ctrl+C is pressed. + app.exec_() diff --git a/examples/schedulers/tornado_.py b/examples/schedulers/tornado_.py new file mode 100644 index 0000000..42ddab5 --- /dev/null +++ b/examples/schedulers/tornado_.py @@ -0,0 +1,25 @@ +""" +Demonstrates how to use the Tornado compatible scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime + +from tornado.ioloop import IOLoop +from apscheduler.schedulers.tornado import TornadoScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = TornadoScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.start() + print('Press Ctrl+C to exit') + + # Execution will block here until Ctrl+C is pressed. + try: + IOLoop.instance().start() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/examples/schedulers/twisted_.py b/examples/schedulers/twisted_.py new file mode 100644 index 0000000..a1f7358 --- /dev/null +++ b/examples/schedulers/twisted_.py @@ -0,0 +1,25 @@ +""" +Demonstrates how to use the Twisted compatible scheduler to schedule a job that executes on 3 second intervals. +""" + +from datetime import datetime + +from twisted.internet import reactor +from apscheduler.schedulers.twisted import TwistedScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = TwistedScheduler() + scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.start() + print('Press Ctrl+C to exit') + + # Execution will block here until Ctrl+C is pressed. + try: + reactor.run() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/examples/threaded.py b/examples/threaded.py deleted file mode 100644 index 7a11937..0000000 --- a/examples/threaded.py +++ /dev/null @@ -1,25 +0,0 @@ -""" -Basic example showing how the scheduler integrates with the application it's running alongside with. -""" - -from datetime import datetime -import time - -from apscheduler.scheduler import Scheduler - - -def tick(): - print('Tick! The time is: %s' % datetime.now()) - - -if __name__ == '__main__': - scheduler = Scheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) - print('Press Ctrl+C to exit') - scheduler.start() - - # This is here to simulate application activity (which keeps the main - # thread alive). - while True: - print('This is the main thread.') - time.sleep(2) diff --git a/tests/conftest.py b/tests/conftest.py index f2dc475..7c4f70b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,8 @@ from apscheduler.jobstores.shelve import ShelveJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.redis import RedisJobStore +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.blocking import BlockingScheduler @pytest.fixture diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index 986f0c6..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,47 +0,0 @@ -from time import sleep - -import pytest - -from apscheduler.scheduler import Scheduler -from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_MISSED -from tests.conftest import all_jobstores, all_jobstores_ids - - -def increment(vals, sleeptime): - vals[0] += 1 - sleep(sleeptime) - - -@pytest.fixture(params=all_jobstores, ids=all_jobstores_ids) -def sched(request): - jobstore = request.param(request) - sched = Scheduler() - sched.add_jobstore(jobstore, 'persistent') - sched.start() - request.addfinalizer(sched.shutdown) - return sched - - -def test_overlapping_runs(sched): - # Makes sure that "increment" is only ran once, since it will still be - # running when the next appointed time hits. - - vals = [0] - sched.add_job(increment, 'interval', {'seconds': 1}, args=[vals, 2], jobstore='persistent') - sleep(2.5) - assert vals == [1] - - -def test_max_instances(sched): - vals = [0] - events = [] - sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - sched.add_job(increment, 'interval', {'seconds': 0.3}, max_instances=2, max_runs=4, args=[vals, 1], - jobstore='persistent') - sleep(2.4) - assert vals == [2] - assert len(events) == 4 - assert events[0].code == EVENT_JOB_MISSED - assert events[1].code == EVENT_JOB_MISSED - assert events[2].code == EVENT_JOB_EXECUTED - assert events[3].code == EVENT_JOB_EXECUTED diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py deleted file mode 100644 index 759c73e..0000000 --- a/tests/test_scheduler.py +++ /dev/null @@ -1,465 +0,0 @@ -from datetime import datetime, timedelta -from logging import StreamHandler, ERROR -from io import StringIO -from copy import copy -import os - -from dateutil.tz import tzoffset -import pytest - -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.scheduler import Scheduler, SchedulerAlreadyRunningError -from apscheduler.job import Job -from apscheduler.events import (EVENT_JOB_EXECUTED, SchedulerEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, - EVENT_JOB_MISSED) -from apscheduler import scheduler -from tests.conftest import minpython - - -local_tz = tzoffset('DUMMYTZ', 3600) - - -class FakeThread(object): - def isAlive(self): - return True - - -class FakeThreadPool(object): - def submit(self, func, *args, **kwargs): - func(*args, **kwargs) - - -class DummyException(Exception): - pass - - -class FakeDateTime(datetime): - original_now = datetime(2011, 4, 3, 18, 40, tzinfo=local_tz) - _now = original_now - - @classmethod - def now(cls, timezone=None): - return cls._now - - -class TestOfflineScheduler(object): - @pytest.fixture() - def sched(self, request): - def finish(): - if sched.running: - sched.shutdown() - - sched = Scheduler() - request.addfinalizer(finish) - return sched - - def test_jobstore_twice(self, sched): - with pytest.raises(KeyError): - sched.add_jobstore(MemoryJobStore(), 'dummy') - sched.add_jobstore(MemoryJobStore(), 'dummy') - - def test_add_tentative_job(self, sched): - job = sched.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy') - assert isinstance(job, Job) - assert sched.get_jobs() == [] - - def test_add_job_by_reference(self, sched): - job = sched.add_job('copy:copy', 'date', [datetime(2200, 7, 24)], args=[()]) - assert job.func == copy - assert job.func_ref == 'copy:copy' - - def test_configure_jobstore(self, sched): - conf = {'apscheduler.jobstore.memstore.class': 'apscheduler.jobstores.memory:MemoryJobStore'} - sched.configure(conf) - sched.remove_jobstore('memstore') - - def test_shutdown_offline(self, sched): - sched.shutdown() - - def test_configure_no_prefix(self, sched): - global_options = {'misfire_grace_time': '2', 'daemonic': 'false'} - sched.configure(global_options) - assert sched.misfire_grace_time == 1 - assert sched.daemonic is True - - def test_configure_prefix(self, sched): - global_options = {'apscheduler.misfire_grace_time': 2, 'apscheduler.daemonic': False} - sched.configure(global_options) - assert sched.misfire_grace_time == 2 - assert sched.daemonic is False - - def test_add_listener(self, sched): - val = [] - sched.add_listener(val.append) - - event = SchedulerEvent(EVENT_SCHEDULER_START) - sched._notify_listeners(event) - assert len(val) == 1 - assert val[0] == event - - event = SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN) - sched._notify_listeners(event) - assert len(val) == 2 - assert val[1] == event - - sched.remove_listener(val.append) - sched._notify_listeners(event) - assert len(val) == 2 - - def test_pending_jobs(self, sched): - # Tests that pending jobs are properly added to the jobs list when - # the scheduler is started (and not before!) - sched.add_job(lambda: None, 'date', [datetime(9999, 9, 9)]) - assert sched.get_jobs() == [] - - sched.start() - jobs = sched.get_jobs() - assert len(jobs) == 1 - - def test_invalid_callable_args(self, sched): - """Tests that attempting to schedule a job with an invalid number of arguments raises an exception.""" - - with pytest.raises(ValueError) as error: - sched.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], args=[1, 2]) - - assert str(error.value) == ('The list of positional arguments is longer than the target callable can handle ' - '(allowed: 1, given in args: 2)') - - def test_invalid_callable_kwargs(self, sched): - """Tests that attempting to schedule a job with unmatched keyword arguments raises an exception.""" - - with pytest.raises(ValueError) as error: - sched.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], kwargs={'x': 0, 'y': 1}) - - assert str(error.value) == 'The target callable does not accept the following keyword arguments: y' - - def test_missing_callable_args(self, sched): - """Tests that attempting to schedule a job with missing arguments raises an exception.""" - - with pytest.raises(ValueError) as error: - sched.add_job(lambda x, y, z: None, 'date', [datetime(9999, 9, 9)], args=[1], kwargs={'y': 0}) - - assert str(error.value) == 'The following arguments are not supplied: z' - - def test_conflicting_callable_args(self, sched): - """Tests that attempting to schedule a job where the combination of args and kwargs are in conflict raises an - exception.""" - - with pytest.raises(ValueError) as error: - sched.add_job(lambda x, y: None, 'date', [datetime(9999, 9, 9)], args=[1, 2], kwargs={'y': 1}) - - assert str(error.value) == 'The following arguments are supplied in both args and kwargs: y' - - @minpython(3) - def test_unfulfilled_kwargs(self, sched): - """Tests that attempting to schedule a job where not all keyword-only arguments are fulfilled raises an - exception.""" - - func = eval("lambda x, *, y, z=1: None") - with pytest.raises(ValueError) as error: - sched.add_job(func, 'date', [datetime(9999, 9, 9)], args=[1]) - - assert str(error.value) == 'The following keyword-only arguments have not been supplied in kwargs: y' - - -class TestJobExecution(object): - @pytest.fixture - def sched(self): - sched = Scheduler(threadpool=FakeThreadPool(), timezone=local_tz) - sched.add_jobstore(MemoryJobStore(), 'default') - - # Make the scheduler think it's running - sched._thread = FakeThread() - return sched - - @pytest.fixture - def logstream(self, request): - stream = StringIO() - loghandler = StreamHandler(stream) - loghandler.setLevel(ERROR) - scheduler.logger.addHandler(loghandler) - request.addfinalizer(lambda: scheduler.logger.removeHandler(loghandler)) - return stream - - @pytest.fixture - def fake_datetime(self, request, monkeypatch): - monkeypatch.setattr(scheduler, 'datetime', FakeDateTime) - request.addfinalizer(lambda: setattr(FakeDateTime, '_now', FakeDateTime.original_now)) - - def test_job_name(self, sched): - def my_job(): - pass - - job = sched.add_job(my_job, 'interval', {'start_date': datetime(2010, 5, 19)}) - assert (repr(job) == - "<Job (name=my_job, trigger=<IntervalTrigger (interval=datetime.timedelta(0, 1), " - "start_date='2010-05-19 00:00:00 DUMMYTZ')>)>") - - def test_schedule_object(self, sched): - # Tests that any callable object is accepted (and not just functions) - class A: - def __init__(self): - self.val = 0 - - def __call__(self): - self.val += 1 - - a = A() - job = sched.add_job(a, 'interval', {'seconds': 1}) - sched._process_jobs(job.next_run_time) - sched._process_jobs(job.next_run_time) - assert a.val == 2 - - def test_schedule_method(self, sched): - # Tests that bound methods can be scheduled (at least with RAMJobStore) - class A: - def __init__(self): - self.val = 0 - - def method(self): - self.val += 1 - - a = A() - job = sched.add_job(a.method, 'interval', {'seconds': 1}) - sched._process_jobs(job.next_run_time) - sched._process_jobs(job.next_run_time) - assert a.val == 2 - - def test_unschedule_job(self, sched): - def increment(): - vals[0] += 1 - - vals = [0] - job = sched.add_job(increment, 'cron') - sched._process_jobs(job.next_run_time) - assert vals[0] == 1 - sched.unschedule_job(job) - sched._process_jobs(job.next_run_time) - assert vals[0] == 1 - - def test_unschedule_func(self, sched): - def increment(): - vals[0] += 1 - - def increment2(): - vals[0] += 1 - - vals = [0] - job1 = sched.add_job(increment, 'cron') - job2 = sched.add_job(increment2, 'cron') - job3 = sched.add_job(increment, 'cron') - assert sched.get_jobs() == [job1, job2, job3] - - sched.unschedule_func(increment) - assert sched.get_jobs() == [job2] - - def test_unschedule_func_notfound(self, sched): - pytest.raises(KeyError, sched.unschedule_func, copy) - - def test_job_finished(self, sched): - def increment(): - vals[0] += 1 - - vals = [0] - job = sched.add_job(increment, 'interval', max_runs=1) - sched._process_jobs(job.next_run_time) - assert vals == [1] - assert job not in sched.get_jobs() - - def test_job_exception(self, sched, logstream): - def failure(): - raise DummyException - - job = sched.add_job(failure, 'date', [datetime(9999, 9, 9)]) - sched._process_jobs(job.next_run_time) - assert 'DummyException' in logstream.getvalue() - - def test_misfire_grace_time(self, sched): - sched.misfire_grace_time = 3 - job = sched.add_job(lambda: None, 'interval', {'seconds': 1}) - assert job.misfire_grace_time == 3 - - job = sched.add_job(lambda: None, 'interval', {'seconds': 1}, misfire_grace_time=2) - assert job.misfire_grace_time == 2 - - def test_coalesce_on(self, sched, fake_datetime): - # Makes sure that the job is only executed once when it is scheduled - # to be executed twice in a row - def increment(): - vals[0] += 1 - - vals = [0] - events = [] - sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()}, - coalesce=True, misfire_grace_time=2) - - # Turn the clock 14 seconds forward - FakeDateTime._now += timedelta(seconds=2) - - sched._process_jobs(FakeDateTime.now()) - assert job.runs == 1 - assert len(events) == 1 - assert events[0].code == EVENT_JOB_EXECUTED - assert vals == [1] - - def test_coalesce_off(self, sched, fake_datetime): - # Makes sure that every scheduled run for the job is executed even - # when they are in the past (but still within misfire_grace_time) - def increment(): - vals[0] += 1 - - vals = [0] - events = [] - sched.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()}, - coalesce=False, misfire_grace_time=2) - - # Turn the clock 2 seconds forward - FakeDateTime._now += timedelta(seconds=2) - - sched._process_jobs(FakeDateTime.now()) - assert job.runs == 3 - assert len(events) == 3 - assert events[0].code == EVENT_JOB_EXECUTED - assert events[1].code == EVENT_JOB_EXECUTED - assert events[2].code == EVENT_JOB_EXECUTED - assert vals == [3] - - def test_interval(self, sched): - def increment(amount): - vals[0] += amount - vals[1] += 1 - - vals = [0, 0] - job = sched.add_job(increment, 'interval', {'seconds': 1}, args=[2]) - sched._process_jobs(job.next_run_time) - sched._process_jobs(job.next_run_time) - assert vals == [4, 2] - - def test_interval_schedule(self, sched): - @sched.scheduled_job('interval', {'seconds': 1}) - def increment(): - vals[0] += 1 - - vals = [0] - start = increment.job.next_run_time - sched._process_jobs(start) - sched._process_jobs(start + timedelta(seconds=1)) - assert vals == [2] - - def test_cron(self, sched): - def increment(amount): - vals[0] += amount - vals[1] += 1 - - vals = [0, 0] - job = sched.add_job(increment, 'cron', args=[3]) - start = job.next_run_time - sched._process_jobs(start) - assert vals == [3, 1] - sched._process_jobs(start + timedelta(seconds=1)) - assert vals == [6, 2] - sched._process_jobs(start + timedelta(seconds=2)) - assert vals == [9, 3] - - def test_cron_schedule_1(self, sched): - @sched.scheduled_job('cron') - def increment(): - vals[0] += 1 - - vals = [0] - start = increment.job.next_run_time - sched._process_jobs(start) - sched._process_jobs(start + timedelta(seconds=1)) - assert vals[0] == 2 - - def test_cron_schedule_2(self, sched): - @sched.scheduled_job('cron', {'minute': '*'}) - def increment(): - vals[0] += 1 - - vals = [0] - start = increment.job.next_run_time - next_run = start + timedelta(seconds=60) - assert increment.job.get_run_times(next_run) == [start, next_run] - sched._process_jobs(start) - sched._process_jobs(next_run) - assert vals[0] == 2 - - def test_date(self, sched): - def append_val(value): - vals.append(value) - - vals = [] - date = datetime.now(local_tz) + timedelta(seconds=1) - sched.add_job(append_val, 'date', [date], kwargs={'value': 'test'}) - sched._process_jobs(date) - assert vals == ['test'] - - def test_print_jobs(self, sched): - out = StringIO() - sched.print_jobs(out) - expected = 'Jobstore default:%s'\ - ' No scheduled jobs%s' % (os.linesep, os.linesep) - assert out.getvalue() == expected - - sched.add_job(copy, 'date', [datetime(2200, 5, 19)], args=[()]) - out = StringIO() - sched.print_jobs(out) - expected = 'Jobstore default:%s '\ - 'copy (trigger: date[2200-05-19 00:00:00 DUMMYTZ], '\ - 'next run at: 2200-05-19 00:00:00 DUMMYTZ)%s' % (os.linesep, os.linesep) - assert out.getvalue() == expected - - def test_jobstore(self, sched): - sched.add_jobstore(MemoryJobStore(), 'dummy') - job = sched.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy') - assert sched.get_jobs() == [job] - sched.remove_jobstore('dummy') - assert sched.get_jobs() == [] - - def test_remove_nonexistent_jobstore(self, sched): - pytest.raises(KeyError, sched.remove_jobstore, 'dummy2') - - def test_job_next_run_time(self, sched): - # Tests against bug #5 - def increment(): - vars[0] += 1 - - vars = [0] - scheduler.datetime = FakeDateTime - job = sched.add_job(increment, 'interval', {'seconds': 1, 'start_date': FakeDateTime.now()}, - misfire_grace_time=3) - start = job.next_run_time - - sched._process_jobs(start) - assert vars == [1] - - sched._process_jobs(start) - assert vars == [1] - - sched._process_jobs(start + timedelta(seconds=1)) - assert vars == [2] - - -class TestRunningScheduler(object): - @pytest.fixture - def sched(self, request): - sched = Scheduler() - sched.start() - request.addfinalizer(sched.shutdown) - return sched - - def test_shutdown_timeout(self, sched): - sched.shutdown() - - def test_scheduler_double_start(self, sched): - pytest.raises(SchedulerAlreadyRunningError, sched.start) - - def test_scheduler_configure_running(self, sched): - pytest.raises(SchedulerAlreadyRunningError, sched.configure, {}) - - def test_scheduler_double_shutdown(self, sched): - sched.shutdown() - sched.shutdown(False) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py new file mode 100644 index 0000000..0e9ef5f --- /dev/null +++ b/tests/test_schedulers.py @@ -0,0 +1,577 @@ +from datetime import datetime, timedelta +from logging import StreamHandler, ERROR +from io import StringIO +from copy import copy +from threading import Event, Thread +from time import sleep +import os + +from dateutil.tz import tzoffset +import pytest + +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.base import BaseScheduler +from apscheduler.job import Job +from apscheduler.events import (SchedulerEvent, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, + EVENT_JOB_MISSED, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_JOB_ADDED, + EVENT_JOBSTORE_JOB_REMOVED) +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.threadpool import ThreadPool +from tests.conftest import minpython + +dummy_tz = tzoffset('DUMMYTZ', 3600) +dummy_datetime = datetime(2011, 4, 3, 18, 40, tzinfo=dummy_tz) + + +class DummyThreadPool(object): + def submit(self, func, *args, **kwargs): + func(*args, **kwargs) + + def shutdown(self, wait): + pass + + +class DummyException(Exception): + pass + + +class DummyScheduler(BaseScheduler): + def __init__(self, gconfig={}, **options): + super(DummyScheduler, self).__init__(gconfig, timezone=dummy_tz, threadpool=DummyThreadPool(), **options) + self.now = dummy_datetime + + def start(self): + super(DummyScheduler, self).start() + + def shutdown(self, wait=True): + super(DummyScheduler, self).shutdown() + + def _wakeup(self): + pass + + def _current_time(self): + return self.now + + +def increment(vals): + vals[0] += 1 + + +@pytest.fixture +def scheduler(request): + sched = DummyScheduler() + if 'start_scheduler' in request.keywords: + sched.start() + request.addfinalizer(lambda: sched.shutdown() if sched.running else None) + + return sched + + +@pytest.fixture +def logstream(request, scheduler): + stream = StringIO() + loghandler = StreamHandler(stream) + loghandler.setLevel(ERROR) + scheduler.logger.addHandler(loghandler) + request.addfinalizer(lambda: scheduler.logger.removeHandler(loghandler)) + return stream + + +class TestOfflineScheduler(object): + def test_jobstore_twice(self, scheduler): + with pytest.raises(KeyError): + scheduler.add_jobstore(MemoryJobStore(), 'dummy') + scheduler.add_jobstore(MemoryJobStore(), 'dummy') + + def test_add_tentative_job(self, scheduler): + job = scheduler.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy') + assert isinstance(job, Job) + assert scheduler.get_jobs() == [] + + def test_add_job_by_reference(self, scheduler): + job = scheduler.add_job('copy:copy', 'date', [datetime(2200, 7, 24)], args=[()]) + assert job.func == copy + assert job.func_ref == 'copy:copy' + + def test_configure_jobstore(self, scheduler): + conf = {'apscheduler.jobstore.memstore.class': 'apscheduler.jobstores.memory:MemoryJobStore'} + scheduler.configure(conf) + scheduler.remove_jobstore('memstore') + + def test_shutdown_offline(self, scheduler): + pytest.raises(SchedulerNotRunningError, scheduler.shutdown) + + def test_configure_no_prefix(self, scheduler): + global_options = {'misfire_grace_time': '2', 'coalesce': 'false'} + scheduler.configure(global_options) + assert scheduler.misfire_grace_time == 1 + assert scheduler.coalesce is True + + def test_configure_prefix(self, scheduler): + global_options = {'apscheduler.misfire_grace_time': 2, 'apscheduler.coalesce': False} + scheduler.configure(global_options) + assert scheduler.misfire_grace_time == 2 + assert scheduler.coalesce is False + + def test_add_listener(self, scheduler): + val = [] + scheduler.add_listener(val.append) + + event = SchedulerEvent(EVENT_SCHEDULER_START) + scheduler._notify_listeners(event) + assert len(val) == 1 + assert val[0] == event + + event = SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN) + scheduler._notify_listeners(event) + assert len(val) == 2 + assert val[1] == event + + scheduler.remove_listener(val.append) + scheduler._notify_listeners(event) + assert len(val) == 2 + + def test_pending_jobs(self, scheduler): + """Tests that pending jobs are properly added to the jobs list, but only when the scheduler is started.""" + + scheduler.add_job(lambda: None, 'date', [datetime(9999, 9, 9)]) + assert scheduler.get_jobs() == [] + + scheduler.start() + jobs = scheduler.get_jobs() + assert len(jobs) == 1 + + def test_invalid_callable_args(self, scheduler): + """Tests that attempting to schedule a job with an invalid number of arguments raises an exception.""" + + with pytest.raises(ValueError) as error: + scheduler.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], args=[1, 2]) + + assert str(error.value) == ('The list of positional arguments is longer than the target callable can handle ' + '(allowed: 1, given in args: 2)') + + def test_invalid_callable_kwargs(self, scheduler): + """Tests that attempting to schedule a job with unmatched keyword arguments raises an exception.""" + + with pytest.raises(ValueError) as error: + scheduler.add_job(lambda x: None, 'date', [datetime(9999, 9, 9)], kwargs={'x': 0, 'y': 1}) + + assert str(error.value) == 'The target callable does not accept the following keyword arguments: y' + + def test_missing_callable_args(self, scheduler): + """Tests that attempting to schedule a job with missing arguments raises an exception.""" + + with pytest.raises(ValueError) as error: + scheduler.add_job(lambda x, y, z: None, 'date', [datetime(9999, 9, 9)], args=[1], kwargs={'y': 0}) + + assert str(error.value) == 'The following arguments are not supplied: z' + + def test_conflicting_callable_args(self, scheduler): + """Tests that attempting to schedule a job where the combination of args and kwargs are in conflict raises an + exception.""" + + with pytest.raises(ValueError) as error: + scheduler.add_job(lambda x, y: None, 'date', [datetime(9999, 9, 9)], args=[1, 2], kwargs={'y': 1}) + + assert str(error.value) == 'The following arguments are supplied in both args and kwargs: y' + + @minpython(3) + def test_unfulfilled_kwargs(self, scheduler): + """Tests that attempting to schedule a job where not all keyword-only arguments are fulfilled raises an + exception.""" + + func = eval("lambda x, *, y, z=1: None") + with pytest.raises(ValueError) as error: + scheduler.add_job(func, 'date', [datetime(9999, 9, 9)], args=[1]) + + assert str(error.value) == 'The following keyword-only arguments have not been supplied in kwargs: y' + + +@pytest.mark.start_scheduler +class TestJobExecution(object): + def test_job_name(self, scheduler): + def my_job(): + pass + + job = scheduler.add_job(my_job, 'interval', {'start_date': datetime(2010, 5, 19)}) + assert (repr(job) == + "<Job (name=my_job, trigger=<IntervalTrigger (interval=datetime.timedelta(0, 1), " + "start_date='2010-05-19 00:00:00 DUMMYTZ')>)>") + + def test_schedule_object(self, scheduler): + """Tests that any callable object is accepted (and not just functions).""" + + class A(object): + def __init__(self): + self.val = 0 + + def __call__(self): + self.val += 1 + + a = A() + job = scheduler.add_job(a, 'interval', {'seconds': 1}) + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert a.val == 1 + + def test_schedule_method(self, scheduler): + """Tests that bound methods can be scheduled (at least with MemoryJobStore).""" + + class A: + def __init__(self): + self.val = 0 + + def method(self): + self.val += 1 + + a = A() + job = scheduler.add_job(a.method, 'interval', {'seconds': 1}) + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert a.val == 1 + + def test_unschedule_job(self, scheduler): + vals = [0] + job = scheduler.add_job(increment, 'cron', args=(vals,)) + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert vals[0] == 1 + + scheduler.unschedule_job(job) + scheduler._process_jobs() + assert vals[0] == 1 + + def test_unschedule_func(self, scheduler): + vals = [0] + job1 = scheduler.add_job(increment, 'cron', args=(vals,)) + job2 = scheduler.add_job(lambda: None, 'cron') + job3 = scheduler.add_job(increment, 'cron', args=(vals,)) + assert scheduler.get_jobs() == [job1, job2, job3] + + scheduler.unschedule_func(increment) + assert scheduler.get_jobs() == [job2] + + def test_unschedule_func_notfound(self, scheduler): + pytest.raises(KeyError, scheduler.unschedule_func, copy) + + def test_job_finished(self, scheduler): + vals = [0] + job = scheduler.add_job(increment, 'interval', args=(vals,), max_runs=1) + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert vals == [1] + assert job not in scheduler.get_jobs() + + def test_job_exception(self, scheduler, logstream): + def failure(): + raise DummyException + + job = scheduler.add_job(failure, 'date', [datetime(9999, 9, 9)]) + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert 'DummyException' in logstream.getvalue() + + def test_misfire_grace_time(self, scheduler): + scheduler.misfire_grace_time = 3 + job = scheduler.add_job(lambda: None, 'interval', {'seconds': 1}) + assert job.misfire_grace_time == 3 + + job = scheduler.add_job(lambda: None, 'interval', {'seconds': 1}, misfire_grace_time=2) + assert job.misfire_grace_time == 2 + + def test_coalesce_on(self, scheduler): + """Tests that the job is only executed once when it is scheduled to be executed twice in a row.""" + + vals = [0] + events = [] + scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) + job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime}, + args=(vals,), coalesce=True, misfire_grace_time=2) + + # Turn the clock 2 seconds forward + scheduler.now += timedelta(seconds=2) + + scheduler._process_jobs() + assert job.runs == 1 + assert len(events) == 1 + assert events[0].code == EVENT_JOB_EXECUTED + assert vals == [1] + + def test_coalesce_off(self, scheduler): + """Tests that every scheduled run for the job is executed even when they are in the past + (but still within misfire_grace_time). + """ + + vals = [0] + events = [] + scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) + job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime}, + args=(vals,), coalesce=False, misfire_grace_time=2) + + # Turn the clock 2 seconds forward + scheduler.now += timedelta(seconds=2) + + scheduler._process_jobs() + assert job.runs == 3 + assert len(events) == 3 + assert events[0].code == EVENT_JOB_EXECUTED + assert events[1].code == EVENT_JOB_EXECUTED + assert events[2].code == EVENT_JOB_EXECUTED + assert vals == [3] + + def test_print_jobs(self, scheduler): + out = StringIO() + scheduler.print_jobs(out) + expected = 'Jobstore default:%s'\ + ' No scheduled jobs%s' % (os.linesep, os.linesep) + assert out.getvalue() == expected + + scheduler.add_job(copy, 'date', [datetime(2200, 5, 19)], args=[()]) + out = StringIO() + scheduler.print_jobs(out) + expected = 'Jobstore default:%s '\ + 'copy (trigger: date[2200-05-19 00:00:00 DUMMYTZ], '\ + 'next run at: 2200-05-19 00:00:00 DUMMYTZ)%s' % (os.linesep, os.linesep) + assert out.getvalue() == expected + + def test_jobstore(self, scheduler): + scheduler.add_jobstore(MemoryJobStore(), 'dummy') + job = scheduler.add_job(lambda: None, 'date', [datetime(2200, 7, 24)], jobstore='dummy') + assert scheduler.get_jobs() == [job] + scheduler.remove_jobstore('dummy') + assert scheduler.get_jobs() == [] + + def test_remove_nonexistent_jobstore(self, scheduler): + """Tests that KeyError is raised when trying to remove a job store that doesn't exist.""" + + pytest.raises(KeyError, scheduler.remove_jobstore, 'dummy2') + + def test_job_next_run_time(self, scheduler): + """Tests against bug #5.""" + + vals = [0] + job = scheduler.add_job(increment, 'interval', {'seconds': 1, 'start_date': dummy_datetime}, + args=(vals,), misfire_grace_time=3) + + scheduler.now = job.next_run_time + scheduler._process_jobs() + assert vals == [1] + + scheduler._process_jobs() + assert vals == [1] + + scheduler.now = job.next_run_time + timedelta(seconds=1) + scheduler._process_jobs() + assert vals == [2] + + def test_max_instances(self, scheduler): + """Tests that the maximum instance limit on a job is respected and that missed job events are dispatched when + the job cannot be run due to the instance limitation. + """ + + def wait_event(): + vals[0] += 1 + event.wait(2) + + vals = [0] + events = [] + event = Event() + shutdown_event = Event() + scheduler._threadpool = ThreadPool() + scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) + scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN) + scheduler.add_job(wait_event, 'interval', {'seconds': 1, 'start_date': dummy_datetime}, max_instances=2, + max_runs=4) + for _ in range(4): + scheduler._process_jobs() + scheduler.now += timedelta(seconds=1) + event.set() + scheduler.shutdown() + shutdown_event.wait(2) + + assert vals == [2] + assert len(events) == 4 + assert events[0].code == EVENT_JOB_MISSED + assert events[1].code == EVENT_JOB_MISSED + assert events[2].code == EVENT_JOB_EXECUTED + assert events[3].code == EVENT_JOB_EXECUTED + + +@pytest.mark.start_scheduler +class TestRunningScheduler(object): + def test_shutdown_timeout(self, scheduler): + scheduler.shutdown() + + def test_scheduler_double_start(self, scheduler): + pytest.raises(SchedulerAlreadyRunningError, scheduler.start) + + def test_scheduler_configure_running(self, scheduler): + pytest.raises(SchedulerAlreadyRunningError, scheduler.configure, {}) + + def test_scheduler_double_shutdown(self, scheduler): + scheduler.shutdown() + pytest.raises(SchedulerNotRunningError, scheduler.shutdown, False) + + +class SchedulerImplementationTestBase(object): + @pytest.fixture + def scheduler(self, request): + sched = self.create_scheduler() + request.addfinalizer(lambda: self.finish(sched)) + return sched + + def create_scheduler(self): + raise NotImplementedError + + def create_event(self): + return Event() + + def process_events(self): + pass + + def finish(self, sched): + pass + + def test_scheduler_implementation(self, scheduler): + """Tests that starting the scheduler eventually calls _process_jobs().""" + + class TimeRoller(object): + def __init__(self, start, step): + self.now = start + self.step = timedelta(seconds=step) + + def next(self): + return self.now + self.step + + def __call__(self): + now = self.now + self.now = self.next() + return now + + events = [] + vals = [0] + job_removed_event = self.create_event() + shutdown_event = self.create_event() + scheduler._threadpool = DummyThreadPool() + + # Test that pending jobs are added (and if due, executed) when the scheduler starts + scheduler._current_time = time_roller = TimeRoller(dummy_datetime, 0.2) + scheduler.add_listener(events.append) + scheduler.add_listener(lambda e: job_removed_event.set(), EVENT_JOBSTORE_JOB_REMOVED) + scheduler.add_job(increment, 'date', [time_roller.next()], args=(vals,)) + scheduler.start() + self.process_events() + job_removed_event.wait(2) + assert job_removed_event.is_set() + assert vals[0] == 1 + assert len(events) == 5 + assert events[0].code == EVENT_JOBSTORE_ADDED + assert events[1].code == EVENT_JOBSTORE_JOB_ADDED + assert events[2].code == EVENT_SCHEDULER_START + assert events[3].code == EVENT_JOB_EXECUTED + assert events[4].code == EVENT_JOBSTORE_JOB_REMOVED + del events[:] + job_removed_event.clear() + + # Test that adding a job causes it to be executed after the specified delay + job = scheduler.add_job(increment, 'date', [time_roller.next() + time_roller.step * 2], args=(vals,)) + self.process_events() + sleep(0.5) + self.process_events() + job_removed_event.wait(2) + assert job_removed_event.is_set() + assert vals[0] == 2 + assert len(events) == 3 + assert events[0].code == EVENT_JOBSTORE_JOB_ADDED + assert events[1].code == EVENT_JOB_EXECUTED + assert events[2].code == EVENT_JOBSTORE_JOB_REMOVED + del events[:] + job_removed_event.clear() + + # Test that shutting down the scheduler emits the proper event + scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN) + scheduler.shutdown() + self.process_events() + shutdown_event.wait(2) + assert shutdown_event.is_set() + assert len(events) == 1 + assert events[0].code == EVENT_SCHEDULER_SHUTDOWN + + +class TestBlockingScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + sched = BlockingScheduler() + self.thread = Thread(target=sched.start) + sched.start = self.thread.start + return sched + + def finish(self, sched): + self.thread.join() + + +class TestBackgroundScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + return BackgroundScheduler() + + +class TestAsyncIOScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') + sched = asyncio.AsyncIOScheduler() + self.thread = Thread(target=sched._eventloop.run_forever) + self.thread.start() + return sched + + def finish(self, sched): + sched._eventloop.call_soon_threadsafe(sched._eventloop.stop) + self.thread.join() + + +class TestGeventScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + gevent = pytest.importorskip('apscheduler.schedulers.gevent') + return gevent.GeventScheduler() + + def create_event(self): + from gevent.event import Event + return Event() + + +class TestTornadoScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + tornado = pytest.importorskip('apscheduler.schedulers.tornado') + sched = tornado.TornadoScheduler() + self.thread = Thread(target=sched._ioloop.start) + self.thread.start() + return sched + + def finish(self, sched): + sched._ioloop.add_callback(sched._ioloop.stop) + self.thread.join() + + +class TestTwistedScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + twisted = pytest.importorskip('apscheduler.schedulers.twisted') + sched = twisted.TwistedScheduler() + self.thread = Thread(target=sched._reactor.run, args=(False,)) + self.thread.start() + return sched + + def finish(self, sched): + sched._reactor.callFromThread(sched._reactor.stop) + self.thread.join() + + +class TestQtScheduler(SchedulerImplementationTestBase): + def create_scheduler(self): + qt = pytest.importorskip('apscheduler.schedulers.qt') + from PySide.QtCore import QCoreApplication + QCoreApplication([]) + return qt.QtScheduler() + + def process_events(self): + from PySide.QtCore import QCoreApplication + QCoreApplication.processEvents() diff --git a/tests/test_util.py b/tests/test_util.py index 64ca00a..e081ec8 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,16 +1,15 @@ # coding: utf-8 -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, tzinfo import time import os import shelve -from dateutil.tz import tzoffset +from dateutil.tz import tzoffset, gettz import pytest from apscheduler.util import * from tests.conftest import minpython - local_tz = tzoffset('DUMMYTZ', 3600) @@ -35,10 +34,6 @@ class DummyClass(object): pass -def meth(): - pass - - def test_asint_invalid_1(): pytest.raises(ValueError, asint, '5s') @@ -71,6 +66,25 @@ def test_asbool_fail(): pytest.raises(ValueError, asbool, 'yep') +def test_astimezone_str(): + value = astimezone('Europe/Helsinki') + assert isinstance(value, tzinfo) + + +def test_astimezone_tz(): + tz = gettz('Europe/Helsinki') + value = astimezone(tz) + assert tz is value + + +def test_astimezone_none(): + assert astimezone(None) is None + + +def test_astimezone_fail(): + pytest.raises(TypeError, astimezone, 4) + + def test_convert_datetime_date(): dateval = date(2009, 8, 1) datetimeval = convert_to_datetime(dateval, local_tz, None) @@ -1,19 +1,39 @@ [tox] -envlist = py26,py27,py32,py33,jython,pypy,pep8 +envlist = py26,py27,py32,py33,py34,pypy,pep8 -[testenv] +[base] deps=pytest pytest-cov sqlalchemy pymongo redis + +[testenv] +deps={[base]deps} + twisted + gevent + tornado commands=py.test [] -# Jython does not have SQLite, so don't download SQLAlchemy. -[testenv:jython] -deps=nose - pymongo - redis +[testenv:py32] +deps={[testenv:py34]deps} + cython + https://github.com/fantix/gevent/archive/master.zip + +[testenv:py33] +deps={[testenv:py34]deps} + asyncio + cython + https://github.com/fantix/gevent/archive/master.zip + +[testenv:py34] +deps={[base]deps} + tornado + +[testenv:pypy] +deps={[base]deps} + tornado + twisted [testenv:pep8] deps=pep8 |