diff options
29 files changed, 652 insertions, 641 deletions
@@ -9,3 +9,4 @@ dist/ build/ .coverage .cache/ +example.sqlite diff --git a/apscheduler/executors/__init__.py b/apscheduler/executors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/apscheduler/executors/__init__.py diff --git a/apscheduler/executors/base.py b/apscheduler/executors/base.py new file mode 100644 index 0000000..e9c5d55 --- /dev/null +++ b/apscheduler/executors/base.py @@ -0,0 +1,29 @@ +from abc import ABCMeta, abstractmethod + +import six + + +class MaxInstancesReachedError(Exception): + pass + + +class BaseExecutor(six.with_metaclass(ABCMeta, object)): + """Base class of all executors.""" + + @abstractmethod + def submit_job(self, job, run_times): + """ + Submits job for execution. + + :param job: job to execute + :param run_times: list of `~datetime.datetime` objects specifying when the job should have been run + :type job: `~apscheduler.scheduler.job.Job` + :type run_times: list + """ + + def shutdown(self, wait=True): + """ + Shuts down this executor. + + :param wait: ``True`` to wait until all submitted jobs have been executed + """ diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py new file mode 100644 index 0000000..f253168 --- /dev/null +++ b/apscheduler/executors/pool.py @@ -0,0 +1,87 @@ +from collections import defaultdict +from datetime import timedelta, datetime +from functools import partial +import concurrent.futures +import logging +import sys + +from dateutil.tz import tzutc + +from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError +from apscheduler.events import JobEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED + +logger = logging.getLogger(__name__) +utc = tzutc() + + +def run_job(job, run_times): + events = [] + for run_time in run_times: + # See if the job missed its run time window, and handle possible misfires accordingly + difference = datetime.now(utc) - run_time + grace_time = timedelta(seconds=job.misfire_grace_time) + if difference > grace_time: + events.append(JobEvent(EVENT_JOB_MISSED, job, run_time)) + logger.warning('Run time of job "%s" was missed by %s', job, difference) + continue + + logger.info('Running job "%s" (scheduled at %s)', job, run_time) + try: + retval = job.func(*job.args, **job.kwargs) + except: + exc, tb = sys.exc_info()[1:] + events.append(JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb)) + logger.exception('Job "%s" raised an exception', job) + else: + events.append(JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval)) + logger.info('Job "%s" executed successfully', job) + + return events + + +class BasePoolExecutor(BaseExecutor): + def __init__(self, scheduler, executor, logger=None): + super(BasePoolExecutor, self).__init__() + self._instances = defaultdict(lambda: 0) + self._scheduler = scheduler + self._executor = executor + self._logger = logger or logging.getLogger(__name__) + self._lock = scheduler._create_lock() + + def submit_job(self, job, run_times): + with self._lock: + if self._instances[job.id] >= job.max_instances: + raise MaxInstancesReachedError + f = self._executor.submit(run_job, job, run_times) + f.add_done_callback(partial(self.job_finished, job.id)) + self._instances[job.id] += 1 + + def job_finished(self, job_id, future): + with self._lock: + self._instances[job_id] -= 1 + + # Dispatch all the events that the worker produced while running the target function + try: + events = future.result() + except Exception as e: + logger.exception('Error executing job %s' % job_id) + else: + for event in events: + self._scheduler._notify_listeners(event) + + def shutdown(self, wait=True): + self._executor.shutdown(wait) + + +class ThreadPoolExecutor(BasePoolExecutor): + def __init__(self, scheduler, max_workers=20): + executor = concurrent.futures.ThreadPoolExecutor(max_workers) + logger = logging.getLogger(__name__) + super(ThreadPoolExecutor, self).__init__(scheduler, executor, logger) + + +class ProcessPoolExecutor(BasePoolExecutor): + def __init__(self, scheduler, max_workers=5): + executor = concurrent.futures.ProcessPoolExecutor(max_workers) + logger = logging.getLogger(__name__) + super(ProcessPoolExecutor, self).__init__(scheduler, executor, logger) diff --git a/apscheduler/job.py b/apscheduler/job.py index 1a3a57c..12a8b14 100644 --- a/apscheduler/job.py +++ b/apscheduler/job.py @@ -1,6 +1,5 @@ from collections import Iterable, Mapping from inspect import isfunction, ismethod -from threading import Lock from datetime import timedelta, datetime from uuid import uuid4 @@ -15,31 +14,29 @@ except ImportError: from inspect import getargspec -class MaxInstancesReachedError(Exception): - pass - - class Job(object): """ Encapsulates the actual Job along with its metadata. This class is used internally by APScheduler, and should never be instantiated by the user. """ - __slots__ = ('_lock', 'id', 'trigger', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time', - 'coalesce', 'max_runs', 'max_instances', 'runs', 'instances', 'next_run_time') + __slots__ = ('id', 'trigger', 'executor', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time', + 'coalesce', 'max_runs', 'max_instances', 'runs', 'next_run_time') trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers')) trigger_classes = {} - def __init__(self, **kwargs): + def __init__(self, id=None, **kwargs): super(Job, self).__init__() - self.instances = 0 self.runs = 0 self.next_run_time = None + if id and not isinstance(id, six.string_types): + raise TypeError("id must be a nonempty string") + self.id = id or uuid4().hex + changes = self.validate_changes(kwargs) self.modify(changes) - self._lock = Lock() def modify(self, changes): for key, value in six.iteritems(changes): @@ -50,14 +47,6 @@ class Job(object): approved = {} - if 'id' in changes: - value = changes.pop('id') - if value is None: - value = uuid4().hex - elif not isinstance(value, six.string_types): - raise TypeError("id must be a nonempty string") - approved['id'] = value - if 'func' in changes or 'args' in changes or 'kwargs' in changes: func = changes.pop('func') if 'func' in changes else self.func args = changes.pop('args') if 'args' in changes else self.args @@ -147,6 +136,12 @@ class Job(object): approved['trigger'] = trigger + if 'executor' in changes: + value = changes.pop('executor') + if not isinstance(value, six.string_types): + raise TypeError('executor must be a string') + approved['executor'] = value + if 'next_run_time' in changes: value = changes.pop('next_run_time') if not isinstance(value, datetime): @@ -215,23 +210,13 @@ class Job(object): return run_times - def add_instance(self): - with self._lock: - if self.instances == self.max_instances: - raise MaxInstancesReachedError - self.instances += 1 - - def remove_instance(self): - with self._lock: - assert self.instances > 0, 'Already at 0 instances' - self.instances -= 1 - def __getstate__(self): return { 'version': 1, 'id': self.id, 'func': self.func_ref, 'trigger': self.trigger, + 'executor': self.executor, 'args': self.args, 'kwargs': self.kwargs, 'name': self.name, @@ -245,13 +230,13 @@ class Job(object): def __setstate__(self, state): if state.get('version', 1) > 1: - raise ValueError('Job has version %s, but only version 1 and lower can be handled' % state['version']) + raise ValueError('Job has version %s, but only version 1 can be handled' % state['version']) - self._lock = Lock() self.id = state['id'] self.func_ref = state['func'] self.func = ref_to_obj(self.func_ref) self.trigger = state['trigger'] + self.executor = state['executor'] self.args = state['args'] self.kwargs = state['kwargs'] self.name = state['name'] @@ -261,7 +246,6 @@ class Job(object): self.max_instances = state['max_instances'] self.runs = state['runs'] self.next_run_time = state['next_run_time'] - self.instances = 0 def __eq__(self, other): if isinstance(other, Job): diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py index 7319761..8dd5d8b 100644 --- a/apscheduler/jobstores/sqlalchemy.py +++ b/apscheduler/jobstores/sqlalchemy.py @@ -30,7 +30,7 @@ class SQLAlchemyJobStore(BaseJobStore): if engine: self.engine = maybe_ref(engine) elif url: - self.engine = create_engine(url, echo=True) + self.engine = create_engine(url) else: raise ValueError('Need either "engine" or "url" defined') @@ -40,7 +40,7 @@ class SQLAlchemyJobStore(BaseJobStore): pickle_coltype = PickleType(pickle_protocol) self.jobs_t = Table( tablename, metadata, - Column('id', Unicode(1024), primary_key=True), + Column('id', Unicode(1024, _warn_on_bytestring=False), primary_key=True), Column('next_run_time', BigInteger, index=True), Column('job_data', pickle_coltype, nullable=False) ) diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py index f4521b5..509604a 100644 --- a/apscheduler/schedulers/base.py +++ b/apscheduler/schedulers/base.py @@ -9,11 +9,12 @@ from dateutil.tz import tzlocal import six from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from apscheduler.util import * +from apscheduler.executors.base import MaxInstancesReachedError +from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.job import Job, MaxInstancesReachedError, JobHandle +from apscheduler.job import Job, JobHandle from apscheduler.events import * -from apscheduler.threadpool import ThreadPool +from apscheduler.util import * class BaseScheduler(six.with_metaclass(ABCMeta)): @@ -27,6 +28,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def __init__(self, gconfig={}, **options): super(BaseScheduler, self).__init__() + self._executors = {} + self._executors_lock = self._create_lock() self._jobstores = {} self._jobstores_lock = self._create_lock() self._listeners = [] @@ -52,9 +55,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if self.running: raise SchedulerAlreadyRunningError - # Create a RAMJobStore as the default if there is no default job store + # Create a default executor if nothing else is configured + if not 'default' in self._executors: + self.add_executor(self._create_default_executor(), 'default') + + # Create a default job store if nothing else is configured if not 'default' in self._jobstores: - self.add_jobstore(MemoryJobStore(), 'default', True) + self.add_jobstore(self._create_default_jobstore(), 'default', True) # Schedule all pending jobs for job, jobstore in self._pending_jobs: @@ -79,8 +86,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self._stopped = True - # Shut down the thread pool - self._threadpool.shutdown(wait) + # Shut down all executors + for executor in six.itervalues(self._executors): + executor.shutdown(wait) # Close all job stores for jobstore in six.itervalues(self._jobstores): @@ -93,7 +101,23 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def running(self): return not self._stopped - def add_jobstore(self, jobstore, alias, quiet=False): + def add_executor(self, executor, alias='default'): + """ + Adds an executor to this scheduler. + + :param executor: the executor instance to be added + :param alias: alias for the scheduler + :type executor: `~apscheduler.executors.base.BaseExecutor` + :type alias: str + """ + + with self._executors_lock: + if alias in self._executors: + raise KeyError('This scheduler already has an executor by the alias of "%s"' % alias) + executor.scheduler = self + self._executors[alias] = executor + + def add_jobstore(self, jobstore, alias='default', quiet=False): """ Adds a job store to this scheduler. @@ -156,7 +180,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): del self._listeners[i] def add_job(self, trigger, func, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None, - max_runs=None, max_instances=1, jobstore='default', **trigger_args): + max_runs=None, max_instances=1, jobstore='default', executor='default', **trigger_args): """ Adds the given job to the job list and notifies the scheduler thread. @@ -182,6 +206,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param max_runs: maximum number of times this job is allowed to be triggered :param max_instances: maximum number of concurrently running instances allowed for this job :param jobstore: alias of the job store to store the job in + :param executor: alias of the executor to run the job with :type id: str/unicode :type args: list/tuple :type jobstore: str/unicode @@ -190,6 +215,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :type coalesce: bool :type max_runs: int :type max_instances: int + :type jobstore: str + :type executor: str :rtype: :class:`~apscheduler.job.JobHandle` """ @@ -198,6 +225,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): job_kwargs = { 'trigger': trigger, 'trigger_args': trigger_args, + 'executor': executor, 'func': func, 'args': tuple(args) if args is not None else (), 'kwargs': dict(kwargs) if kwargs is not None else {}, @@ -220,12 +248,12 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): return JobHandle(self, jobstore, job) def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None, - max_runs=None, max_instances=1, jobstore='default', **trigger_args): + max_runs=None, max_instances=1, jobstore='default', executor='default', **trigger_args): """A decorator version of :meth:`add_job`.""" def inner(func): self.add_job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs, - max_instances, jobstore, **trigger_args) + max_instances, jobstore, executor, **trigger_args) return func return inner @@ -360,12 +388,19 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self.coalesce = asbool(config.pop('coalesce', True)) self.timezone = astimezone(config.pop('timezone', None)) or tzlocal() - # Configure the thread pool - if 'threadpool' in config: - self._threadpool = maybe_ref(config['threadpool']) - else: - threadpool_opts = combine_opts(config, 'threadpool.') - self._threadpool = ThreadPool(**threadpool_opts) + # Configure executors + executor_opts = combine_opts(config, 'executor.') + executors = {} + for key, value in executor_opts.items(): + store_name, option = key.split('.', 1) + opts_dict = executors.setdefault(store_name, {}) + opts_dict[option] = value + + for alias, opts in executors.items(): + classname = opts.pop('class') + cls = maybe_ref(classname) + executor = cls(**opts) + self.add_executor(executor, alias) # Configure job stores jobstore_opts = combine_opts(config, 'jobstore.') @@ -381,6 +416,18 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): jobstore = cls(**opts) self.add_jobstore(jobstore, alias, True) + def _create_default_jobstore(self): + return MemoryJobStore() + + def _create_default_executor(self): + return ThreadPoolExecutor(self) + + def _lookup_executor(self, executor): + try: + return self._executors[executor] + except KeyError: + raise KeyError('No such executor: %s' % executor) + def _notify_listeners(self, event): with self._listeners_lock: listeners = tuple(self._listeners) @@ -393,8 +440,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self.logger.exception('Error notifying listener') def _real_add_job(self, job, jobstore, wakeup): - # Recalculate the next run time - job.next_run_time = job.trigger.get_next_fire_time(self._current_time()) + # Calculate the next run time + now = datetime.now(self.timezone) + job.next_run_time = job.trigger.get_next_fire_time(now) # Add the job to the given job store store = self._jobstores.get(jobstore) @@ -417,70 +465,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): """Triggers :meth:`_process_jobs` to be run in an implementation specific manner.""" def _create_lock(self): - return RLock() - - def _current_time(self): - return datetime.now(self.timezone) - - def _run_job(self, job, run_times): - """Acts as a harness that runs the actual job code in the thread pool.""" - - for run_time in run_times: - # See if the job missed its run time window, and handle possible - # misfires accordingly - difference = self._current_time() - run_time - grace_time = timedelta(seconds=job.misfire_grace_time) - if difference > grace_time: - # Notify listeners about a missed run - event = JobEvent(EVENT_JOB_MISSED, job, run_time) - self._notify_listeners(event) - self.logger.warning('Run time of job "%s" was missed by %s', job, difference) - else: - try: - job.add_instance() - except MaxInstancesReachedError: - event = JobEvent(EVENT_JOB_MISSED, job, run_time) - self._notify_listeners(event) - self.logger.warning( - 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', job, - job.max_instances) - break - - self.logger.info('Running job "%s" (scheduled at %s)', job, run_time) - - try: - retval = job.func(*job.args, **job.kwargs) - except: - # Notify listeners about the exception - exc, tb = sys.exc_info()[1:] - event = JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb) - self._notify_listeners(event) + """Creates a reentrant lock object.""" - self.logger.exception('Job "%s" raised an exception', job) - else: - # Notify listeners about successful execution - event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval) - self._notify_listeners(event) - - self.logger.info('Job "%s" executed successfully', job) - - job.remove_instance() - - # If coalescing is enabled, don't attempt any further runs - if job.coalesce: - break + return RLock() def _process_jobs(self): - """Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for - the next round. + """ + Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for the next + round. """ self.logger.debug('Looking for jobs to run') - now = self._current_time() + now = datetime.now(self.timezone) next_wakeup_time = None with self._jobstores_lock: - for alias, jobstore in six.iteritems(self._jobstores): + for jobstore_alias, jobstore in six.iteritems(self._jobstores): jobs, jobstore_next_wakeup_time = jobstore.get_pending_jobs(now) if not next_wakeup_time: next_wakeup_time = jobstore_next_wakeup_time @@ -488,27 +488,42 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): next_wakeup_time = min(next_wakeup_time or jobstore_next_wakeup_time) for job in jobs: + # Look up the job's executor + try: + executor = self._lookup_executor(job.executor) + except: + self.logger.error('Executor lookup failed for job "%s": %s', job, job.executor) + continue + run_times = job.get_run_times(now) + run_times = run_times[-1:] if run_times and job.coalesce else run_times if run_times: - self._threadpool.submit(self._run_job, job, run_times) - - # Update the job, but don't keep finished jobs around - job_runs = job.runs + 1 if job.coalesce else len(run_times) + try: + executor.submit_job(job, run_times) + except MaxInstancesReachedError: + self.logger.warning( + 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', + job, job.max_instances) + continue + except: + self.logger.exception('Error submitting job "%s" to executor "%s"', job, job.executor) + continue + + # Update the job if it has a next execution time and the number of runs has not reached maximum, + # otherwise remove it from the job store + job_runs = job.runs + len(run_times) job_next_run = job.trigger.get_next_fire_time(now + timedelta(microseconds=1)) if job_next_run and (job.max_runs is None or job_runs < job.max_runs): changes = {'next_run_time': job_next_run, 'runs': job_runs} jobstore.modify_job(job.id, changes) + next_wakeup_time = min(next_wakeup_time, job_next_run) if next_wakeup_time else job_next_run else: - self.remove_job(job.id, alias) - - if not next_wakeup_time: - next_wakeup_time = job.next_run_time - elif job.next_run_time: - next_wakeup_time = min(next_wakeup_time, job.next_run_time) + self.remove_job(job.id, jobstore_alias) # Determine the delay until this method should be called again if next_wakeup_time is not None: - wait_seconds = time_difference(next_wakeup_time, now) + wait_seconds = timedelta_seconds(next_wakeup_time - now) + self.logger.debug('now = %s, next_wakeup_time = %s', now, next_wakeup_time) self.logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) else: wait_seconds = None diff --git a/apscheduler/threadpool.py b/apscheduler/threadpool.py deleted file mode 100644 index 6419941..0000000 --- a/apscheduler/threadpool.py +++ /dev/null @@ -1,124 +0,0 @@ -""" -Generic thread pool class. Modeled after Java's ThreadPoolExecutor. -Please note that this ThreadPool does *not* fully implement the PEP 3148 ThreadPool! -""" - -from threading import Thread, Lock, currentThread -from weakref import ref -import logging -import atexit - -try: - from queue import Queue, Empty -except ImportError: - from Queue import Queue, Empty - -logger = logging.getLogger(__name__) -_threadpools = set() - - -# Worker threads are daemonic in order to let the interpreter exit without an explicit shutdown of the thread pool. -# The following trick is necessary to allow worker threads to finish cleanly. -def _shutdown_all(): - for pool_ref in tuple(_threadpools): - pool = pool_ref() - if pool: - pool.shutdown() - -atexit.register(_shutdown_all) - - -class ThreadPool(object): - def __init__(self, core_threads=0, max_threads=20, keepalive=1): - """ - :param core_threads: maximum number of persistent threads in the pool - :param max_threads: maximum number of total threads in the pool - :param thread_class: callable that creates a Thread object - :param keepalive: seconds to keep non-core worker threads waiting for new tasks - """ - self.core_threads = core_threads - self.max_threads = max(max_threads, core_threads, 1) - self.keepalive = keepalive - self._queue = Queue() - self._threads_lock = Lock() - self._threads = set() - self._shutdown = False - - _threadpools.add(ref(self)) - logger.info('Started thread pool with %d core threads and %s maximum threads', core_threads, - max_threads or 'unlimited') - - def _adjust_threadcount(self): - with self._threads_lock: - if self.num_threads < self.max_threads: - self._add_thread(self.num_threads < self.core_threads) - - def _add_thread(self, core): - t = Thread(target=self._run_jobs, args=(core,)) - t.setDaemon(True) - t.start() - self._threads.add(t) - - def _run_jobs(self, core): - logger.debug('Started worker thread') - block = True - timeout = None - if not core: - block = self.keepalive > 0 - timeout = self.keepalive - - while True: - try: - func, args, kwargs = self._queue.get(block, timeout) - except Empty: - break - - if self._shutdown: - break - - try: - func(*args, **kwargs) - except: - logger.exception('Error in worker thread') - - with self._threads_lock: - self._threads.remove(currentThread()) - - logger.debug('Exiting worker thread') - - @property - def num_threads(self): - return len(self._threads) - - def submit(self, func, *args, **kwargs): - if self._shutdown: - raise RuntimeError('Cannot schedule new tasks after shutdown') - - self._queue.put((func, args, kwargs)) - self._adjust_threadcount() - - def shutdown(self, wait=True): - if self._shutdown: - return - - logging.info('Shutting down thread pool') - self._shutdown = True - _threadpools.remove(ref(self)) - - with self._threads_lock: - for _ in range(self.num_threads): - self._queue.put((None, None, None)) - - if wait: - with self._threads_lock: - threads = tuple(self._threads) - for thread in threads: - thread.join() - - def __repr__(self): - if self.max_threads: - threadcount = '%d/%d' % (self.num_threads, self.max_threads) - else: - threadcount = '%d' % self.num_threads - - return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount) diff --git a/apscheduler/util.py b/apscheduler/util.py index c5a5e3e..170ff99 100644 --- a/apscheduler/util.py +++ b/apscheduler/util.py @@ -4,15 +4,14 @@ This module contains several handy functions primarily meant for internal use. from datetime import date, datetime, timedelta, tzinfo from calendar import timegm -from time import mktime import re from dateutil.tz import gettz, tzutc import six __all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp', - 'utc_timestamp_to_datetime', 'timedelta_seconds', 'time_difference', 'datetime_ceil', 'combine_opts', - 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref') + 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'combine_opts', 'get_callable_name', + 'obj_to_ref', 'ref_to_obj', 'maybe_ref') def asint(text): @@ -121,22 +120,6 @@ def timedelta_seconds(delta): delta.microseconds / 1000000.0 -def time_difference(date1, date2): - """ - Returns the time difference in seconds between the given two datetime objects. - The difference is calculated as: date1 - date2. - - :param date1: the later datetime - :type date1: datetime - :param date2: the earlier datetime - :type date2: datetime - :rtype: float - """ - later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0 - earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0 - return later - earlier - - def datetime_ceil(dateval): """ Rounds the given datetime object upwards. diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py new file mode 100644 index 0000000..e76eaa6 --- /dev/null +++ b/examples/executors/processpool.py @@ -0,0 +1,24 @@ +""" +Demonstrates how to schedule a job to be run in a process pool on 3 second intervals. +""" + +from datetime import datetime +from apscheduler.executors.pool import ProcessPoolExecutor + +from apscheduler.schedulers.blocking import BlockingScheduler + + +def tick(): + print('Tick! The time is: %s' % datetime.now()) + + +if __name__ == '__main__': + scheduler = BlockingScheduler() + scheduler.add_executor(ProcessPoolExecutor(scheduler)) + scheduler.add_job('interval', tick, seconds=3) + print('Press Ctrl+C to exit') + + try: + scheduler.start() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/examples/jobstores/mongodb.py b/examples/jobstores/mongodb.py index 49f1f0b..c6895e8 100644 --- a/examples/jobstores/mongodb.py +++ b/examples/jobstores/mongodb.py @@ -18,9 +18,13 @@ def alarm(time): if __name__ == '__main__': scheduler = BlockingScheduler() - scheduler.add_jobstore(MongoDBJobStore(collection='example_jobs')) + jobstore = MongoDBJobStore(collection='example_jobs') + if len(sys.argv) > 1 and sys.argv[1] == '--clear': + jobstore.remove_all_jobs() + + scheduler.add_jobstore(jobstore) alarm_time = datetime.now() + timedelta(seconds=10) - scheduler.add_job(alarm, 'date', [alarm_time], args=[datetime.now()]) + scheduler.add_job('date', alarm, run_date=alarm_time, args=[datetime.now()]) print('To clear the alarms, run this example with the --clear argument.') print('Press Ctrl+C to exit') diff --git a/examples/jobstores/sqlalchemy_.py b/examples/jobstores/sqlalchemy_.py index 561c3f8..4423d11 100644 --- a/examples/jobstores/sqlalchemy_.py +++ b/examples/jobstores/sqlalchemy_.py @@ -18,10 +18,10 @@ def alarm(time): if __name__ == '__main__': scheduler = BlockingScheduler() - url = sys.argv[1] or 'sqlite://example.sqlite' + url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite' scheduler.add_jobstore(SQLAlchemyJobStore(url)) alarm_time = datetime.now() + timedelta(seconds=10) - scheduler.add_job(alarm, 'date', [alarm_time], args=[datetime.now()]) + scheduler.add_job('date', alarm, run_date=alarm_time, args=[datetime.now()]) print('To clear the alarms, delete the example.sqlite file.') print('Press Ctrl+C to exit') diff --git a/examples/misc/reference.py b/examples/misc/reference.py index 2b0c414..8bb41bf 100644 --- a/examples/misc/reference.py +++ b/examples/misc/reference.py @@ -7,7 +7,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler if __name__ == '__main__': scheduler = BlockingScheduler() - scheduler.add_job('sys:stdout.write', 'interval', {'seconds': 3}, args=['tick\n']) + scheduler.add_job('interval', 'sys:stdout.write', seconds=3, args=['tick\n']) print('Press Ctrl+C to exit') try: diff --git a/examples/schedulers/asyncio_.py b/examples/schedulers/asyncio_.py index 01cc138..df6574e 100644 --- a/examples/schedulers/asyncio_.py +++ b/examples/schedulers/asyncio_.py @@ -14,7 +14,7 @@ def tick(): if __name__ == '__main__': scheduler = AsyncIOScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) scheduler.start() print('Press Ctrl+C to exit') diff --git a/examples/schedulers/background.py b/examples/schedulers/background.py index 8a3dde5..d9862e9 100644 --- a/examples/schedulers/background.py +++ b/examples/schedulers/background.py @@ -14,7 +14,7 @@ def tick(): if __name__ == '__main__': scheduler = BackgroundScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) scheduler.start() print('Press Ctrl+C to exit') diff --git a/examples/schedulers/blocking.py b/examples/schedulers/blocking.py index 55d1385..359b366 100644 --- a/examples/schedulers/blocking.py +++ b/examples/schedulers/blocking.py @@ -13,7 +13,7 @@ def tick(): if __name__ == '__main__': scheduler = BlockingScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) print('Press Ctrl+C to exit') try: diff --git a/examples/schedulers/gevent_.py b/examples/schedulers/gevent_.py index 597aa4d..bf83fb3 100644 --- a/examples/schedulers/gevent_.py +++ b/examples/schedulers/gevent_.py @@ -13,7 +13,7 @@ def tick(): if __name__ == '__main__': scheduler = GeventScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) g = scheduler.start() # g is the greenlet that runs the scheduler loop print('Press Ctrl+C to exit') diff --git a/examples/schedulers/qt.py b/examples/schedulers/qt.py index 15b9192..1ea9fba 100644 --- a/examples/schedulers/qt.py +++ b/examples/schedulers/qt.py @@ -30,7 +30,7 @@ if __name__ == '__main__': label.show() scheduler = QtScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) scheduler.start() # Execution will block here until the user closes the windows or Ctrl+C is pressed. diff --git a/examples/schedulers/tornado_.py b/examples/schedulers/tornado_.py index 42ddab5..92d0ef1 100644 --- a/examples/schedulers/tornado_.py +++ b/examples/schedulers/tornado_.py @@ -14,7 +14,7 @@ def tick(): if __name__ == '__main__': scheduler = TornadoScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) scheduler.start() print('Press Ctrl+C to exit') diff --git a/examples/schedulers/twisted_.py b/examples/schedulers/twisted_.py index a1f7358..33a8065 100644 --- a/examples/schedulers/twisted_.py +++ b/examples/schedulers/twisted_.py @@ -14,7 +14,7 @@ def tick(): if __name__ == '__main__': scheduler = TwistedScheduler() - scheduler.add_job(tick, 'interval', {'seconds': 3}) + scheduler.add_job('interval', tick, seconds=3) scheduler.start() print('Press Ctrl+C to exit') @@ -19,6 +19,10 @@ class PyTest(TestCommand): errno = pytest.main(self.test_args) sys.exit(errno) +extra_requirements = [] +if sys.version_info < (3, 2): + extra_requirements.append('futures') + here = os.path.dirname(__file__) readme_path = os.path.join(here, 'README.rst') readme = open(readme_path).read() @@ -45,7 +49,7 @@ setup( keywords='scheduling cron', license='MIT', packages=find_packages(exclude=['tests']), - install_requires=['six', 'python-dateutil'], + install_requires=['six', 'python-dateutil'] + extra_requirements, tests_require=['pytest >= 2.5.1', 'pytest-cov'], cmdclass={'test': PyTest}, zip_safe=False, diff --git a/tests/conftest.py b/tests/conftest.py index 3c1074a..1f72d9d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,22 @@ +from datetime import datetime import sys +from dateutil.tz import tzoffset import pytest +from apscheduler.job import Job + +try: + from unittest.mock import Mock +except ImportError: + from mock import Mock + def minpython(*version): version_str = '.'.join([str(num) for num in version]) def outer(func): - dec = pytest.mark.skipif(sys.version_info < version, - reason='This test requires at least Python %s' % version_str) + dec = pytest.mark.skipif(sys.version_info < version, reason='Requires Python >= %s' % version_str) return dec(func) return outer @@ -17,7 +25,57 @@ def maxpython(*version): version_str = '.'.join([str(num) for num in version]) def outer(func): - dec = pytest.mark.skipif(sys.version_info >= version, - reason='This test should not be run on Python version %s or above' % version_str) + dec = pytest.mark.skipif(sys.version_info >= version, reason='Requires Python < %s' % version_str) return dec(func) return outer + + +@pytest.fixture(scope='session') +def timezone(): + return tzoffset('DUMMYTZ', 3600) + + +@pytest.fixture +def freeze_time(monkeypatch, timezone): + class TimeFreezer: + def __init__(self, initial): + self.current = initial + self.increment = None + + def get(self, tzinfo): + now = self.current.astimezone(tzinfo) + if self.increment: + self.current += self.increment + return now + + def set(self, new_time): + self.current = new_time + + def next(self,): + return self.current + self.increment + + def set_increment(self, delta): + self.increment = delta + + freezer = TimeFreezer(datetime(2011, 4, 3, 18, 40, tzinfo=timezone)) + fake_datetime = Mock(datetime, now=freezer.get) + monkeypatch.setattr('apscheduler.schedulers.base.datetime', fake_datetime) + monkeypatch.setattr('apscheduler.executors.pool.datetime', fake_datetime) + return freezer + + +@pytest.fixture(scope='session') +def job_defaults(timezone): + run_date = datetime(2011, 4, 3, 18, 40, tzinfo=timezone) + return {'trigger': 'date', 'trigger_args': {'run_date': run_date, 'timezone': timezone}, 'executor': 'default', + 'args': (), 'kwargs': {}, 'id': 'testid', 'misfire_grace_time': 1, 'coalesce': False, 'name': None, + 'max_runs': None, 'max_instances': 1} + + +@pytest.fixture(scope='session') +def create_job(job_defaults): + def create(**kwargs): + job_kwargs = job_defaults.copy() + job_kwargs.update(kwargs) + return Job(**job_kwargs) + return create diff --git a/tests/test_executors.py b/tests/test_executors.py new file mode 100644 index 0000000..e16897b --- /dev/null +++ b/tests/test_executors.py @@ -0,0 +1,46 @@ +import time + +import pytest + +from apscheduler.executors.base import MaxInstancesReachedError +from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor + +try: + from unittest.mock import Mock, MagicMock +except ImportError: + from mock import Mock, MagicMock + + +@pytest.fixture +def scheduler(): + scheduler_ = Mock([]) + scheduler_._create_lock = MagicMock() + return scheduler_ + + +@pytest.fixture(params=[ThreadPoolExecutor, ProcessPoolExecutor], ids=['threadpool', 'processpool']) +def executor(request, scheduler): + executor_ = request.param(scheduler) + request.addfinalizer(executor_.shutdown) + return executor_ + + +def wait_event(): + time.sleep(0.2) + return 'test' + + +def test_max_instances(scheduler, executor, create_job, freeze_time): + """Tests that the maximum instance limit on a job is respected.""" + + events = [] + scheduler._notify_listeners = lambda event: events.append(event) + job = create_job(func=wait_event, max_instances=2, max_runs=3) + executor.submit_job(job, [freeze_time.current]) + executor.submit_job(job, [freeze_time.current]) + + pytest.raises(MaxInstancesReachedError, executor.submit_job, job, [freeze_time.current]) + executor.shutdown() + assert len(events) == 2 + assert events[0].retval == 'test' + assert events[1].retval == 'test' diff --git a/tests/test_job.py b/tests/test_job.py index 5d38032..fee6c4e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,7 +4,7 @@ from dateutil.tz import tzoffset import pytest import six -from apscheduler.job import Job, MaxInstancesReachedError, JobHandle +from apscheduler.job import Job, JobHandle from apscheduler.triggers.date import DateTrigger from tests.conftest import maxpython @@ -16,7 +16,7 @@ except ImportError: local_tz = tzoffset('DUMMYTZ', 3600) run_time = datetime(2010, 12, 13, 0, 8, 0, tzinfo=local_tz) -job_defaults = {'trigger': 'date', 'trigger_args': {'run_date': run_time, 'timezone': local_tz}, +job_defaults = {'trigger': 'date', 'executor': 'default', 'trigger_args': {'run_date': run_time, 'timezone': local_tz}, 'func': '%s:dummyfunc' % __name__, 'args': (), 'kwargs': {}, 'id': 'testid', 'misfire_grace_time': 1, 'coalesce': False, 'name': None, 'max_runs': None, 'max_instances': 1} @@ -75,17 +75,18 @@ class TestJob(object): def test_getstate(self, trigger): job = create_job(trigger=trigger, trigger_args=None) state = job.__getstate__() - assert state == dict(version=1, trigger=trigger, func='tests.test_job:dummyfunc', name='dummyfunc', args=(), - kwargs={}, id='testid', misfire_grace_time=1, coalesce=False, max_runs=None, - max_instances=1, runs=0, next_run_time=None) + assert state == dict(version=1, trigger=trigger, executor='default', func='tests.test_job:dummyfunc', + name='dummyfunc', args=(), kwargs={}, id='testid', misfire_grace_time=1, coalesce=False, + max_runs=None, max_instances=1, runs=0, next_run_time=None) def test_setstate(self, job): trigger = DateTrigger(local_tz, '2010-12-14 13:05:00') - state = dict(version=1, trigger=trigger, func='tests.test_job:dummyfunc', name='testjob.dummyfunc', - args=[], kwargs={}, id='other_id', misfire_grace_time=2, max_runs=2, coalesce=True, - max_instances=2, runs=1, next_run_time=None) + state = dict(version=1, trigger=trigger, executor='dummyexecutor', func='tests.test_job:dummyfunc', + name='testjob.dummyfunc', args=[], kwargs={}, id='other_id', misfire_grace_time=2, max_runs=2, + coalesce=True, max_instances=2, runs=1, next_run_time=None) job.__setstate__(state) assert job.id == 'other_id' + assert job.executor == 'dummyexecutor' assert job.trigger == trigger assert job.func == dummyfunc assert job.max_runs == 2 @@ -105,28 +106,6 @@ class TestJob(object): assert not job == 'bleh' - def test_instances(self, job): - job.max_instances = 2 - assert job.instances == 0 - - job.add_instance() - assert job.instances == 1 - - job.add_instance() - assert job.instances == 2 - - with pytest.raises(MaxInstancesReachedError): - job.add_instance() - - job.remove_instance() - assert job.instances == 1 - - job.remove_instance() - assert job.instances == 0 - - with pytest.raises(AssertionError): - job.remove_instance() - def test_job_repr(self, job): assert repr(job) == '<Job (id=testid)>' diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py index a899eba..5c87c94 100644 --- a/tests/test_jobstores.py +++ b/tests/test_jobstores.py @@ -1,7 +1,6 @@ from datetime import datetime import os -from dateutil.tz import tzoffset import pytest from apscheduler.jobstores.memory import MemoryJobStore @@ -9,10 +8,10 @@ from apscheduler.jobstores.base import JobLookupError, ConflictingIdError, Trans from apscheduler.triggers.date import DateTrigger from apscheduler.job import Job -local_tz = tzoffset('DUMMYTZ', 3600) +#local_tz = tzoffset('DUMMYTZ', 3600) run_time = datetime(2999, 1, 1) -job_defaults = {'args': (), 'kwargs': {}, 'misfire_grace_time': 1, 'coalesce': False, 'name': None, 'max_runs': None, - 'max_instances': 1} +#job_defaults = {'args': (), 'kwargs': {}, 'misfire_grace_time': 1, 'coalesce': False, 'name': None, 'max_runs': None, +# 'max_instances': 1} def dummy_job(): @@ -36,7 +35,8 @@ def memjobstore(request): def sqlalchemyjobstore(request): def finish(): store.close() - os.remove('tempdb.sqlite') + if os.path.exists('tempdb.sqlite'): + os.remove('tempdb.sqlite') sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy') store = sqlalchemy.SQLAlchemyJobStore(url='sqlite:///tempdb.sqlite') @@ -57,72 +57,74 @@ def mongodbjobstore(request): return store -@pytest.fixture +@pytest.fixture(params=[memjobstore, sqlalchemyjobstore, mongodbjobstore], ids=['memory', 'sqlalchemy', 'mongodb']) def jobstore(request): return request.param(request) -def create_job(jobstore, func=dummy_job, trigger_date=run_time, id=None): - trigger_date.replace(tzinfo=local_tz) - trigger = DateTrigger(local_tz, trigger_date) - job = Job(trigger=trigger, func=func, id=id, next_run_time=trigger.run_date, **job_defaults) - jobstore.add_job(job) - return job - -persistent_jobstores = [sqlalchemyjobstore, mongodbjobstore] -persistent_jobstores_ids = ['sqlalchemy', 'mongodb'] -all_jobstores = [memjobstore] + persistent_jobstores -all_jobstores_ids = ['memory'] + persistent_jobstores_ids +@pytest.fixture(params=[sqlalchemyjobstore, mongodbjobstore], ids=['sqlalchemy', 'mongodb']) +def persistent_jobstore(request): + return request.param(request) -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_lookup_job(jobstore): +@pytest.fixture +def create_job(timezone, job_defaults): + def create(jobstore, func=dummy_job, trigger_date=run_time, id=None): + trigger_date = trigger_date.replace(tzinfo=timezone) + trigger = DateTrigger(timezone, trigger_date) + job_kwargs = job_defaults.copy() + job_kwargs['func'] = func + job_kwargs['trigger'] = trigger + job_kwargs['id'] = id + job = Job(**job_kwargs) + job.next_run_time = job.trigger.get_next_fire_time(trigger_date) + jobstore.add_job(job) + return job + + return create + + +def test_lookup_job(jobstore, create_job): initial_job = create_job(jobstore) job = jobstore.lookup_job(initial_job.id) assert job == initial_job -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) def test_lookup_nonexistent_job(jobstore): pytest.raises(JobLookupError, jobstore.lookup_job, 'foo') -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_get_all_jobs(jobstore): +def test_get_all_jobs(jobstore, create_job): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2013, 8, 14)) jobs = jobstore.get_all_jobs() assert jobs == [job2, job1] -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_get_pending_jobs(jobstore): +def test_get_pending_jobs(jobstore, create_job, timezone): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - jobs, next_run_time = jobstore.get_pending_jobs(datetime(2014, 2, 27, tzinfo=local_tz)) + jobs, next_run_time = jobstore.get_pending_jobs(datetime(2014, 2, 27, tzinfo=timezone)) assert jobs == [job3, job2] assert next_run_time == job1.trigger.run_date -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_get_pending_jobs_no_next(jobstore): +def test_get_pending_jobs_no_next(jobstore, create_job, timezone): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - jobs, next_run_time = jobstore.get_pending_jobs(datetime(2016, 5, 10, tzinfo=local_tz)) + jobs, next_run_time = jobstore.get_pending_jobs(datetime(2016, 5, 10, tzinfo=timezone)) assert jobs == [job3, job2, job1] assert next_run_time is None -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_add_job_conflicting_id(jobstore): +def test_add_job_conflicting_id(jobstore, create_job): create_job(jobstore, dummy_job, datetime(2016, 5, 3), id='blah') pytest.raises(ConflictingIdError, create_job, jobstore, dummy_job2, datetime(2014, 2, 26), id='blah') -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_update_job_id_and_others(jobstore): +def test_update_job_id_and_others(jobstore, create_job): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) jobstore.modify_job(job1.id, {'id': 'foo', 'max_instances': 6}) @@ -134,20 +136,18 @@ def test_update_job_id_and_others(jobstore): assert jobs[1].max_instances == 6 -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_update_job_next_runtime(jobstore): +def test_update_job_next_runtime(jobstore, create_job, timezone): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - jobstore.modify_job(job1.id, {'next_run_time': datetime(2014, 1, 3, tzinfo=local_tz)}) + jobstore.modify_job(job1.id, {'next_run_time': datetime(2014, 1, 3, tzinfo=timezone)}) jobs = jobstore.get_all_jobs() assert len(jobs) == 3 assert jobs == [job3, job1, job2] -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_update_job_next_runtime_empty(jobstore): +def test_update_job_next_runtime_empty(jobstore, create_job): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) jobstore.modify_job(job1.id, {'next_run_time': None}) @@ -157,38 +157,33 @@ def test_update_job_next_runtime_empty(jobstore): assert jobs == [job1, job2] -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_update_job_conflicting_id(jobstore): +def test_update_job_conflicting_id(jobstore, create_job): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) pytest.raises(ConflictingIdError, jobstore.modify_job, job2.id, {'id': job1.id}) -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) def test_update_job_nonexistent_job(jobstore): pytest.raises(JobLookupError, jobstore.modify_job, 'foo', {'next_run_time': None}) -@pytest.mark.parametrize('jobstore', persistent_jobstores, indirect=True, ids=persistent_jobstores_ids) -def test_one_job_fails_to_load(jobstore, monkeypatch): - job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14)) +def test_one_job_fails_to_load(persistent_jobstore, create_job, monkeypatch): + job1 = create_job(persistent_jobstore, dummy_job, datetime(2016, 5, 3)) + job2 = create_job(persistent_jobstore, dummy_job2, datetime(2014, 2, 26)) + job3 = create_job(persistent_jobstore, dummy_job3, datetime(2013, 8, 14)) # Make the dummy_job2 function disappear monkeypatch.delitem(globals(), 'dummy_job2') - jobs = jobstore.get_all_jobs() + jobs = persistent_jobstore.get_all_jobs() assert jobs == [job3, job1] -@pytest.mark.parametrize('jobstore', persistent_jobstores, indirect=True, ids=persistent_jobstores_ids) -def test_transient_job_error(jobstore): - pytest.raises(TransientJobError, create_job, jobstore, lambda: None, datetime(2016, 5, 3)) +def test_transient_job_error(persistent_jobstore, create_job): + pytest.raises(TransientJobError, create_job, persistent_jobstore, lambda: None, datetime(2016, 5, 3)) -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_remove_job(jobstore): +def test_remove_job(jobstore, create_job): job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3)) job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) @@ -201,13 +196,11 @@ def test_remove_job(jobstore): assert jobs == [] -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) def test_remove_nonexistent_job(jobstore): pytest.raises(JobLookupError, jobstore.remove_job, 'blah') -@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids) -def test_remove_all_jobs(jobstore): +def test_remove_all_jobs(jobstore, create_job): create_job(jobstore, dummy_job, datetime(2016, 5, 3)) create_job(jobstore, dummy_job2, datetime(2014, 2, 26)) @@ -228,7 +221,7 @@ def test_repr_mongodbjobstore(mongodbjobstore): assert repr(mongodbjobstore) == "<MongoDBJobStore (connection=Connection('localhost', 27017))>" -def test_memstore_close(memjobstore): +def test_memstore_close(memjobstore, create_job): create_job(memjobstore, dummy_job, datetime(2016, 5, 3)) memjobstore.close() assert not memjobstore.get_all_jobs() diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 4802232..15f6370 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,22 +1,21 @@ from datetime import datetime, timedelta from logging import StreamHandler, ERROR -from threading import Event, Thread +from concurrent.futures import Future +from threading import Thread from copy import copy -from time import sleep import os -from dateutil.tz import tzoffset import pytest +from apscheduler.executors.pool import BasePoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.base import BaseScheduler +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import (SchedulerEvent, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MISSED, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_JOB_ADDED, EVENT_JOBSTORE_JOB_REMOVED, EVENT_JOBSTORE_JOB_MODIFIED) -from apscheduler.schedulers.blocking import BlockingScheduler -from apscheduler.threadpool import ThreadPool from tests.conftest import minpython try: @@ -29,15 +28,20 @@ try: except ImportError: from mock import MagicMock -dummy_tz = tzoffset('DUMMYTZ', 3600) -dummy_datetime = datetime(2011, 4, 3, 18, 40, tzinfo=dummy_tz) +class DummyPoolExecutor(object): + def submit(self, func, *arg, **kwargs): + f = Future() + try: + retval = func(*arg, **kwargs) + except Exception as e: + f.set_exception(e) + else: + f.set_result(retval) -class DummyThreadPool(object): - def submit(self, func, *args, **kwargs): - func(*args, **kwargs) + return f - def shutdown(self, wait): + def shutdown(self, wait=True): pass @@ -46,9 +50,9 @@ class DummyException(Exception): class DummyScheduler(BaseScheduler): - def __init__(self, gconfig={}, **options): - super(DummyScheduler, self).__init__(gconfig, timezone=dummy_tz, threadpool=DummyThreadPool(), **options) - self.now = dummy_datetime + def __init__(self, timezone, gconfig={}, **options): + super(DummyScheduler, self).__init__(gconfig, timezone=timezone, **options) + self.add_executor(BasePoolExecutor(self, DummyPoolExecutor()), 'default') def start(self): super(DummyScheduler, self).start() @@ -56,20 +60,20 @@ class DummyScheduler(BaseScheduler): def shutdown(self, wait=True): super(DummyScheduler, self).shutdown() + def _create_default_executor(self): + return DummyPoolExecutor() + def _wakeup(self): pass - def _current_time(self): - return self.now - def increment(vals): vals[0] += 1 @pytest.fixture -def scheduler(request): - sched = DummyScheduler() +def scheduler(request, freeze_time, timezone): + sched = DummyScheduler(timezone) if 'start_scheduler' in request.keywords: sched.start() request.addfinalizer(lambda: sched.shutdown() if sched.running else None) @@ -208,7 +212,7 @@ class TestOfflineScheduler(object): @pytest.mark.start_scheduler class TestRunningScheduler(object): - def test_add_job_object(self, scheduler): + def test_add_job_object(self, scheduler, freeze_time): """Tests that any callable object is accepted (and not just functions).""" class A(object): @@ -220,11 +224,11 @@ class TestRunningScheduler(object): a = A() job = scheduler.add_job('interval', a, seconds=1) - scheduler.now = job.next_run_time + freeze_time.set(job.next_run_time) scheduler._process_jobs() assert a.val == 1 - def test_add_job_method(self, scheduler): + def test_add_job_method(self, scheduler, freeze_time): """Tests that bound methods can be scheduled (at least with MemoryJobStore).""" class A(object): @@ -236,7 +240,7 @@ class TestRunningScheduler(object): a = A() job = scheduler.add_job('interval', a.method, seconds=1) - scheduler.now = job.next_run_time + freeze_time.set(job.next_run_time) scheduler._process_jobs() assert a.val == 1 @@ -283,7 +287,7 @@ class TestRunningScheduler(object): scheduler._process_jobs() assert vals[0] == 1 - def test_remove_all_jobs(self, scheduler): + def test_remove_all_jobs(self, scheduler, freeze_time): """Tests that removing all jobs clears all job stores.""" vals = [0] @@ -303,20 +307,20 @@ class TestRunningScheduler(object): scheduler.remove_all_jobs('default') assert scheduler.get_jobs() == [job2] - def test_job_finished(self, scheduler): + def test_job_finished(self, scheduler, freeze_time): vals = [0] job = scheduler.add_job('interval', increment, args=(vals,), max_runs=1) - scheduler.now = job.next_run_time + freeze_time.set(job.next_run_time) scheduler._process_jobs() assert vals == [1] assert job not in scheduler.get_jobs() - def test_job_exception(self, scheduler, logstream): + def test_job_exception(self, scheduler, freeze_time, logstream): def failure(): raise DummyException job = scheduler.add_job('date', failure, run_date=datetime(9999, 9, 9)) - scheduler.now = job.next_run_time + freeze_time.set(job.next_run_time) scheduler._process_jobs() assert 'DummyException' in logstream.getvalue() @@ -328,17 +332,17 @@ class TestRunningScheduler(object): job = scheduler.add_job('interval', lambda: None, seconds=1, misfire_grace_time=2) assert job.misfire_grace_time == 2 - def test_coalesce_on(self, scheduler): + def test_coalesce_on(self, scheduler, freeze_time): """Tests that the job is only executed once when it is scheduled to be executed twice in a row.""" vals = [0] events = [] scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,), + job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,), coalesce=True, misfire_grace_time=2) # Turn the clock 2 seconds forward - scheduler.now += timedelta(seconds=2) + freeze_time.set(freeze_time.current + timedelta(seconds=2)) scheduler._process_jobs() job.refresh() @@ -347,7 +351,7 @@ class TestRunningScheduler(object): assert events[0].code == EVENT_JOB_EXECUTED assert vals == [1] - def test_coalesce_off(self, scheduler): + def test_coalesce_off(self, scheduler, freeze_time): """Tests that every scheduled run for the job is executed even when they are in the past (but still within misfire_grace_time). """ @@ -355,11 +359,11 @@ class TestRunningScheduler(object): vals = [0] events = [] scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,), + job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,), coalesce=False, misfire_grace_time=2) # Turn the clock 2 seconds forward - scheduler.now += timedelta(seconds=2) + freeze_time.set(freeze_time.current + timedelta(seconds=2)) scheduler._process_jobs() job.refresh() @@ -397,55 +401,24 @@ class TestRunningScheduler(object): pytest.raises(KeyError, scheduler.remove_jobstore, 'dummy2') - def test_job_next_run_time(self, scheduler): + def test_job_next_run_time(self, scheduler, freeze_time): """Tests against bug #5.""" vals = [0] - job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,), + job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,), misfire_grace_time=3) - scheduler.now = job.next_run_time + freeze_time.set(job.next_run_time) scheduler._process_jobs() assert vals == [1] scheduler._process_jobs() assert vals == [1] - scheduler.now = job.next_run_time + timedelta(seconds=1) + freeze_time.set(job.next_run_time + timedelta(seconds=1)) scheduler._process_jobs() assert vals == [2] - def test_max_instances(self, scheduler): - """Tests that the maximum instance limit on a job is respected and that missed job events are dispatched when - the job cannot be run due to the instance limitation. - """ - - def wait_event(): - vals[0] += 1 - event.wait(2) - - vals = [0] - events = [] - event = Event() - shutdown_event = Event() - scheduler._threadpool = ThreadPool() - scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) - scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN) - scheduler.add_job('interval', wait_event, seconds=1, start_date=dummy_datetime, max_instances=2, max_runs=4) - for _ in range(4): - scheduler._process_jobs() - scheduler.now += timedelta(seconds=1) - event.set() - scheduler.shutdown() - shutdown_event.wait(2) - - assert vals == [2] - assert len(events) == 4 - assert events[0].code == EVENT_JOB_MISSED - assert events[1].code == EVENT_JOB_MISSED - assert events[2].code == EVENT_JOB_EXECUTED - assert events[3].code == EVENT_JOB_EXECUTED - def test_scheduler_double_start(self, scheduler): pytest.raises(SchedulerAlreadyRunningError, scheduler.start) @@ -458,162 +431,205 @@ class TestRunningScheduler(object): class SchedulerImplementationTestBase(object): - @pytest.fixture - def scheduler(self, request): - sched = self.create_scheduler() - request.addfinalizer(lambda: self.finish(sched)) - return sched - - def create_scheduler(self): - raise NotImplementedError - - def create_event(self): - return Event() + @pytest.fixture(autouse=True) + def executor(self, scheduler): + dummy = DummyPoolExecutor() + executor = BasePoolExecutor(scheduler, dummy) + scheduler.add_executor(executor) - def process_events(self): - pass - - def finish(self, sched): - pass - - def test_scheduler_implementation(self, scheduler): - """Tests that starting the scheduler eventually calls _process_jobs().""" - - class TimeRoller(object): - def __init__(self, start, step): - self.now = start - self.step = timedelta(seconds=step) - - def next(self): - return self.now + self.step + @pytest.fixture + def start_scheduler(self, request, scheduler): + def cleanup(): + if scheduler.running: + scheduler.shutdown() - def __call__(self): - now = self.now - self.now = self.next() - return now + request.addfinalizer(cleanup) + return scheduler.start - events = [] - vals = [0] - job_removed_event = self.create_event() - shutdown_event = self.create_event() - scheduler._threadpool = DummyThreadPool() - - # Test that pending jobs are added (and if due, executed) when the scheduler starts - scheduler._current_time = time_roller = TimeRoller(dummy_datetime, 0.2) - scheduler.add_listener(events.append) - scheduler.add_listener(lambda e: job_removed_event.set(), EVENT_JOBSTORE_JOB_REMOVED) - scheduler.add_job('date', increment, run_date=time_roller.next(), args=(vals,)) - scheduler.start() - self.process_events() - job_removed_event.wait(2) - assert job_removed_event.is_set() - assert vals[0] == 1 - assert len(events) == 5 - assert events[0].code == EVENT_JOBSTORE_ADDED - assert events[1].code == EVENT_JOBSTORE_JOB_ADDED - assert events[2].code == EVENT_SCHEDULER_START - assert events[3].code == EVENT_JOB_EXECUTED - assert events[4].code == EVENT_JOBSTORE_JOB_REMOVED - del events[:] - job_removed_event.clear() - - # Test that adding a job causes it to be executed after the specified delay - job = scheduler.add_job('date', increment, run_date=time_roller.next() + time_roller.step * 2, args=(vals,)) - self.process_events() - sleep(0.5) - self.process_events() - job_removed_event.wait(2) - assert job_removed_event.is_set() - assert vals[0] == 2 - assert len(events) == 3 - assert events[0].code == EVENT_JOBSTORE_JOB_ADDED - assert events[1].code == EVENT_JOB_EXECUTED - assert events[2].code == EVENT_JOBSTORE_JOB_REMOVED - del events[:] - job_removed_event.clear() + @pytest.fixture + def eventqueue(self, scheduler): + from six.moves.queue import Queue + events = Queue() + scheduler.add_listener(events.put) + return events + + def wait_event(self, queue): + return queue.get(True, 1) + + def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler): + """Tests that pending jobs are added (and if due, executed) when the scheduler starts.""" + + freeze_time.set_increment(timedelta(seconds=0.2)) + scheduler.add_job('date', lambda x, y: x + y, args=[1, 2], run_date=freeze_time.next()) + start_scheduler() + + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_ADDED + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START + event = self.wait_event(eventqueue) + assert event.code == EVENT_JOB_EXECUTED + assert event.retval == 3 + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_REMOVED + + def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler): + """Tests that adding a job causes it to be executed after the specified delay.""" + + freeze_time.set_increment(timedelta(seconds=0.2)) + start_scheduler() + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START + + scheduler.add_job('date', lambda x, y: x + y, args=[1, 2], + run_date=freeze_time.next() + freeze_time.increment * 2) + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_ADDED + event = self.wait_event(eventqueue) + assert event.code == EVENT_JOB_EXECUTED + assert event.retval == 3 + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_REMOVED + + def test_shutdown(self, scheduler, eventqueue, start_scheduler): + """Tests that shutting down the scheduler emits the proper event.""" + + start_scheduler() + assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START - # Test that shutting down the scheduler emits the proper event - scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN) scheduler.shutdown() - self.process_events() - shutdown_event.wait(2) - assert shutdown_event.is_set() - assert len(events) == 1 - assert events[0].code == EVENT_SCHEDULER_SHUTDOWN + assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN class TestBlockingScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): - sched = BlockingScheduler() - self.thread = Thread(target=sched.start) - sched.start = self.thread.start - return sched + @pytest.fixture + def scheduler(self): + return BlockingScheduler() - def finish(self, sched): - self.thread.join() + @pytest.fixture + def start_scheduler(self, request, scheduler): + def cleanup(): + if scheduler.running: + scheduler.shutdown() + thread.join() + + request.addfinalizer(cleanup) + thread = Thread(target=scheduler.start) + return thread.start class TestBackgroundScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture + def scheduler(self): return BackgroundScheduler() class TestAsyncIOScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture + def event_loop(self): + asyncio = pytest.importorskip('asyncio') + return asyncio.new_event_loop() + + @pytest.fixture + def scheduler(self, event_loop): asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') - sched = asyncio.AsyncIOScheduler() - self.thread = Thread(target=sched._eventloop.run_forever) - self.thread.start() - return sched + return asyncio.AsyncIOScheduler(event_loop=event_loop) + + @pytest.fixture + def start_scheduler(self, request, event_loop, scheduler): + def cleanup(): + if scheduler.running: + event_loop.call_soon_threadsafe(scheduler.shutdown) + event_loop.call_soon_threadsafe(event_loop.stop) + thread.join() - def finish(self, sched): - sched._eventloop.call_soon_threadsafe(sched._eventloop.stop) - self.thread.join() + event_loop.call_soon_threadsafe(scheduler.start) + request.addfinalizer(cleanup) + thread = Thread(target=event_loop.run_forever) + return thread.start class TestGeventScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture + def scheduler(self): gevent = pytest.importorskip('apscheduler.schedulers.gevent') return gevent.GeventScheduler() - def create_event(self): + @pytest.fixture + def calc_event(self): from gevent.event import Event return Event() + @pytest.fixture + def eventqueue(self, scheduler): + from gevent.queue import Queue + events = Queue() + scheduler.add_listener(events.put) + return events + class TestTornadoScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture + def io_loop(self): + ioloop = pytest.importorskip('tornado.ioloop') + return ioloop.IOLoop() + + @pytest.fixture + def scheduler(self, io_loop): tornado = pytest.importorskip('apscheduler.schedulers.tornado') - sched = tornado.TornadoScheduler() - self.thread = Thread(target=sched._ioloop.start) - self.thread.start() - return sched + return tornado.TornadoScheduler(io_loop=io_loop) + + @pytest.fixture + def start_scheduler(self, request, io_loop, scheduler): + def cleanup(): + if scheduler.running: + io_loop.add_callback(scheduler.shutdown) + io_loop.add_callback(io_loop.stop) + thread.join() - def finish(self, sched): - sched._ioloop.add_callback(sched._ioloop.stop) - self.thread.join() + io_loop.add_callback(scheduler.start) + request.addfinalizer(cleanup) + thread = Thread(target=io_loop.start) + return thread.start class TestTwistedScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture + def reactor(self): + selectreactor = pytest.importorskip('twisted.internet.selectreactor') + return selectreactor.SelectReactor() + + @pytest.fixture + def scheduler(self, reactor): twisted = pytest.importorskip('apscheduler.schedulers.twisted') - sched = twisted.TwistedScheduler() - self.thread = Thread(target=sched._reactor.run, args=(False,)) - self.thread.start() - return sched + return twisted.TwistedScheduler(reactor=reactor) + + @pytest.fixture + def start_scheduler(self, request, reactor, scheduler): + def cleanup(): + if scheduler.running: + reactor.callFromThread(scheduler.shutdown) + reactor.callFromThread(reactor.stop) + thread.join() - def finish(self, sched): - sched._reactor.callFromThread(sched._reactor.stop) - self.thread.join() + reactor.callFromThread(scheduler.start) + request.addfinalizer(cleanup) + thread = Thread(target=reactor.run, args=(False,)) + return thread.start +@pytest.mark.skip class TestQtScheduler(SchedulerImplementationTestBase): - def create_scheduler(self): + @pytest.fixture(scope='class') + def coreapp(self): + QtCore = pytest.importorskip('PySide.QtCore') + QtCore.QCoreApplication([]) + + @pytest.fixture + def scheduler(self, coreapp): qt = pytest.importorskip('apscheduler.schedulers.qt') - from PySide.QtCore import QCoreApplication - QCoreApplication([]) return qt.QtScheduler() - def process_events(self): + def wait_event(self, queue): from PySide.QtCore import QCoreApplication - QCoreApplication.processEvents() + + while queue.empty(): + QCoreApplication.processEvents() + return queue.get_nowait() diff --git a/tests/test_threadpool.py b/tests/test_threadpool.py deleted file mode 100644 index 1cbe44f..0000000 --- a/tests/test_threadpool.py +++ /dev/null @@ -1,54 +0,0 @@ -from threading import Event -from time import sleep - -import pytest - -from apscheduler.threadpool import ThreadPool - - -def test_threadpool(): - pool = ThreadPool(core_threads=2, keepalive=0) - event1 = Event() - event2 = Event() - event3 = Event() - pool.submit(event1.set) - pool.submit(event2.set) - pool.submit(event3.set) - event1.wait(1) - event2.wait(1) - event3.wait(1) - assert event1.isSet() - assert event2.isSet() - assert event3.isSet() - sleep(0.3) - assert repr(pool) == '<ThreadPool at %x; threads=2/20>' % id(pool) - - pool.shutdown() - assert repr(pool) == '<ThreadPool at %x; threads=0/20>' % id(pool) - - # Make sure double shutdown is ok - pool.shutdown() - - # Make sure one can't submit tasks to a thread pool that has been shut down - pytest.raises(RuntimeError, pool.submit, event1.set) - - -def test_threadpool_maxthreads(): - pool = ThreadPool(core_threads=2, max_threads=1) - assert pool.max_threads == 2 - - pool = ThreadPool(core_threads=2, max_threads=3) - assert pool.max_threads == 3 - - pool = ThreadPool(core_threads=0, max_threads=0) - assert pool.max_threads == 1 - - -def test_threadpool_nocore(): - pool = ThreadPool(keepalive=0) - event = Event() - pool.submit(event.set) - event.wait(1) - assert event.isSet() - sleep(1) - assert repr(pool) == '<ThreadPool at %x; threads=0/20>' % id(pool) diff --git a/tests/test_util.py b/tests/test_util.py index 437c151..03e3c34 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -142,41 +142,6 @@ def test_timedelta_seconds(): assert seconds == 150 -def test_time_difference_positive(): - earlier = datetime(2008, 9, 1, second=3) - later = datetime(2008, 9, 1, second=49) - assert time_difference(later, earlier) == 46 - - -def test_time_difference_negative(): - earlier = datetime(2009, 4, 7, second=7) - later = datetime(2009, 4, 7, second=56) - assert time_difference(earlier, later) == -49 - - -class TestDSTTimeDifference(object): - @pytest.fixture(scope='class', autouse=True) - def timezone(self, request): - def finish(): - del os.environ['TZ'] - time.tzset() - - if hasattr(time, 'tzset'): - os.environ['TZ'] = 'Europe/Helsinki' - time.tzset() - request.addfinalizer(finish) - - def test_time_difference_daylight_1(self): - earlier = datetime(2010, 3, 28, 2) - later = datetime(2010, 3, 28, 4) - assert time_difference(later, earlier) == 3600 - - def test_time_difference_daylight_2(self): - earlier = datetime(2010, 10, 31, 2) - later = datetime(2010, 10, 31, 5) - assert time_difference(later, earlier) == 14400 - - def test_datetime_ceil_round(): dateval = datetime(2009, 4, 7, 2, 10, 16, 4000) correct_answer = datetime(2009, 4, 7, 2, 10, 17) @@ -6,7 +6,6 @@ deps=pytest pytest-cov sqlalchemy pymongo - redis [testenv] deps={[base]deps} @@ -14,7 +13,8 @@ deps={[base]deps} twisted gevent tornado -commands=py.test [] + trollius +commands=py.test -rsx [testenv:py32] deps={[testenv:py34]deps} @@ -37,6 +37,7 @@ deps={[base]deps} mock tornado twisted + trollius [testenv:pep8] deps=pep8 |