summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--apscheduler/executors/__init__.py0
-rw-r--r--apscheduler/executors/base.py29
-rw-r--r--apscheduler/executors/pool.py87
-rw-r--r--apscheduler/job.py48
-rw-r--r--apscheduler/jobstores/sqlalchemy.py4
-rw-r--r--apscheduler/schedulers/base.py185
-rw-r--r--apscheduler/threadpool.py124
-rw-r--r--apscheduler/util.py21
-rw-r--r--examples/executors/processpool.py24
-rw-r--r--examples/jobstores/mongodb.py8
-rw-r--r--examples/jobstores/sqlalchemy_.py4
-rw-r--r--examples/misc/reference.py2
-rw-r--r--examples/schedulers/asyncio_.py2
-rw-r--r--examples/schedulers/background.py2
-rw-r--r--examples/schedulers/blocking.py2
-rw-r--r--examples/schedulers/gevent_.py2
-rw-r--r--examples/schedulers/qt.py2
-rw-r--r--examples/schedulers/tornado_.py2
-rw-r--r--examples/schedulers/twisted_.py2
-rw-r--r--setup.py6
-rw-r--r--tests/conftest.py66
-rw-r--r--tests/test_executors.py46
-rw-r--r--tests/test_job.py39
-rw-r--r--tests/test_jobstores.py103
-rw-r--r--tests/test_schedulers.py388
-rw-r--r--tests/test_threadpool.py54
-rw-r--r--tests/test_util.py35
-rw-r--r--tox.ini5
29 files changed, 652 insertions, 641 deletions
diff --git a/.gitignore b/.gitignore
index 8164ad4..fb62dcf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@ dist/
build/
.coverage
.cache/
+example.sqlite
diff --git a/apscheduler/executors/__init__.py b/apscheduler/executors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/apscheduler/executors/__init__.py
diff --git a/apscheduler/executors/base.py b/apscheduler/executors/base.py
new file mode 100644
index 0000000..e9c5d55
--- /dev/null
+++ b/apscheduler/executors/base.py
@@ -0,0 +1,29 @@
+from abc import ABCMeta, abstractmethod
+
+import six
+
+
+class MaxInstancesReachedError(Exception):
+ pass
+
+
+class BaseExecutor(six.with_metaclass(ABCMeta, object)):
+ """Base class of all executors."""
+
+ @abstractmethod
+ def submit_job(self, job, run_times):
+ """
+ Submits job for execution.
+
+ :param job: job to execute
+ :param run_times: list of `~datetime.datetime` objects specifying when the job should have been run
+ :type job: `~apscheduler.scheduler.job.Job`
+ :type run_times: list
+ """
+
+ def shutdown(self, wait=True):
+ """
+ Shuts down this executor.
+
+ :param wait: ``True`` to wait until all submitted jobs have been executed
+ """
diff --git a/apscheduler/executors/pool.py b/apscheduler/executors/pool.py
new file mode 100644
index 0000000..f253168
--- /dev/null
+++ b/apscheduler/executors/pool.py
@@ -0,0 +1,87 @@
+from collections import defaultdict
+from datetime import timedelta, datetime
+from functools import partial
+import concurrent.futures
+import logging
+import sys
+
+from dateutil.tz import tzutc
+
+from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
+from apscheduler.events import JobEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
+
+logger = logging.getLogger(__name__)
+utc = tzutc()
+
+
+def run_job(job, run_times):
+ events = []
+ for run_time in run_times:
+ # See if the job missed its run time window, and handle possible misfires accordingly
+ difference = datetime.now(utc) - run_time
+ grace_time = timedelta(seconds=job.misfire_grace_time)
+ if difference > grace_time:
+ events.append(JobEvent(EVENT_JOB_MISSED, job, run_time))
+ logger.warning('Run time of job "%s" was missed by %s', job, difference)
+ continue
+
+ logger.info('Running job "%s" (scheduled at %s)', job, run_time)
+ try:
+ retval = job.func(*job.args, **job.kwargs)
+ except:
+ exc, tb = sys.exc_info()[1:]
+ events.append(JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb))
+ logger.exception('Job "%s" raised an exception', job)
+ else:
+ events.append(JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval))
+ logger.info('Job "%s" executed successfully', job)
+
+ return events
+
+
+class BasePoolExecutor(BaseExecutor):
+ def __init__(self, scheduler, executor, logger=None):
+ super(BasePoolExecutor, self).__init__()
+ self._instances = defaultdict(lambda: 0)
+ self._scheduler = scheduler
+ self._executor = executor
+ self._logger = logger or logging.getLogger(__name__)
+ self._lock = scheduler._create_lock()
+
+ def submit_job(self, job, run_times):
+ with self._lock:
+ if self._instances[job.id] >= job.max_instances:
+ raise MaxInstancesReachedError
+ f = self._executor.submit(run_job, job, run_times)
+ f.add_done_callback(partial(self.job_finished, job.id))
+ self._instances[job.id] += 1
+
+ def job_finished(self, job_id, future):
+ with self._lock:
+ self._instances[job_id] -= 1
+
+ # Dispatch all the events that the worker produced while running the target function
+ try:
+ events = future.result()
+ except Exception as e:
+ logger.exception('Error executing job %s' % job_id)
+ else:
+ for event in events:
+ self._scheduler._notify_listeners(event)
+
+ def shutdown(self, wait=True):
+ self._executor.shutdown(wait)
+
+
+class ThreadPoolExecutor(BasePoolExecutor):
+ def __init__(self, scheduler, max_workers=20):
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers)
+ logger = logging.getLogger(__name__)
+ super(ThreadPoolExecutor, self).__init__(scheduler, executor, logger)
+
+
+class ProcessPoolExecutor(BasePoolExecutor):
+ def __init__(self, scheduler, max_workers=5):
+ executor = concurrent.futures.ProcessPoolExecutor(max_workers)
+ logger = logging.getLogger(__name__)
+ super(ProcessPoolExecutor, self).__init__(scheduler, executor, logger)
diff --git a/apscheduler/job.py b/apscheduler/job.py
index 1a3a57c..12a8b14 100644
--- a/apscheduler/job.py
+++ b/apscheduler/job.py
@@ -1,6 +1,5 @@
from collections import Iterable, Mapping
from inspect import isfunction, ismethod
-from threading import Lock
from datetime import timedelta, datetime
from uuid import uuid4
@@ -15,31 +14,29 @@ except ImportError:
from inspect import getargspec
-class MaxInstancesReachedError(Exception):
- pass
-
-
class Job(object):
"""
Encapsulates the actual Job along with its metadata. This class is used internally by APScheduler, and should never
be instantiated by the user.
"""
- __slots__ = ('_lock', 'id', 'trigger', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time',
- 'coalesce', 'max_runs', 'max_instances', 'runs', 'instances', 'next_run_time')
+ __slots__ = ('id', 'trigger', 'executor', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time',
+ 'coalesce', 'max_runs', 'max_instances', 'runs', 'next_run_time')
trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
trigger_classes = {}
- def __init__(self, **kwargs):
+ def __init__(self, id=None, **kwargs):
super(Job, self).__init__()
- self.instances = 0
self.runs = 0
self.next_run_time = None
+ if id and not isinstance(id, six.string_types):
+ raise TypeError("id must be a nonempty string")
+ self.id = id or uuid4().hex
+
changes = self.validate_changes(kwargs)
self.modify(changes)
- self._lock = Lock()
def modify(self, changes):
for key, value in six.iteritems(changes):
@@ -50,14 +47,6 @@ class Job(object):
approved = {}
- if 'id' in changes:
- value = changes.pop('id')
- if value is None:
- value = uuid4().hex
- elif not isinstance(value, six.string_types):
- raise TypeError("id must be a nonempty string")
- approved['id'] = value
-
if 'func' in changes or 'args' in changes or 'kwargs' in changes:
func = changes.pop('func') if 'func' in changes else self.func
args = changes.pop('args') if 'args' in changes else self.args
@@ -147,6 +136,12 @@ class Job(object):
approved['trigger'] = trigger
+ if 'executor' in changes:
+ value = changes.pop('executor')
+ if not isinstance(value, six.string_types):
+ raise TypeError('executor must be a string')
+ approved['executor'] = value
+
if 'next_run_time' in changes:
value = changes.pop('next_run_time')
if not isinstance(value, datetime):
@@ -215,23 +210,13 @@ class Job(object):
return run_times
- def add_instance(self):
- with self._lock:
- if self.instances == self.max_instances:
- raise MaxInstancesReachedError
- self.instances += 1
-
- def remove_instance(self):
- with self._lock:
- assert self.instances > 0, 'Already at 0 instances'
- self.instances -= 1
-
def __getstate__(self):
return {
'version': 1,
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
+ 'executor': self.executor,
'args': self.args,
'kwargs': self.kwargs,
'name': self.name,
@@ -245,13 +230,13 @@ class Job(object):
def __setstate__(self, state):
if state.get('version', 1) > 1:
- raise ValueError('Job has version %s, but only version 1 and lower can be handled' % state['version'])
+ raise ValueError('Job has version %s, but only version 1 can be handled' % state['version'])
- self._lock = Lock()
self.id = state['id']
self.func_ref = state['func']
self.func = ref_to_obj(self.func_ref)
self.trigger = state['trigger']
+ self.executor = state['executor']
self.args = state['args']
self.kwargs = state['kwargs']
self.name = state['name']
@@ -261,7 +246,6 @@ class Job(object):
self.max_instances = state['max_instances']
self.runs = state['runs']
self.next_run_time = state['next_run_time']
- self.instances = 0
def __eq__(self, other):
if isinstance(other, Job):
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py
index 7319761..8dd5d8b 100644
--- a/apscheduler/jobstores/sqlalchemy.py
+++ b/apscheduler/jobstores/sqlalchemy.py
@@ -30,7 +30,7 @@ class SQLAlchemyJobStore(BaseJobStore):
if engine:
self.engine = maybe_ref(engine)
elif url:
- self.engine = create_engine(url, echo=True)
+ self.engine = create_engine(url)
else:
raise ValueError('Need either "engine" or "url" defined')
@@ -40,7 +40,7 @@ class SQLAlchemyJobStore(BaseJobStore):
pickle_coltype = PickleType(pickle_protocol)
self.jobs_t = Table(
tablename, metadata,
- Column('id', Unicode(1024), primary_key=True),
+ Column('id', Unicode(1024, _warn_on_bytestring=False), primary_key=True),
Column('next_run_time', BigInteger, index=True),
Column('job_data', pickle_coltype, nullable=False)
)
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py
index f4521b5..509604a 100644
--- a/apscheduler/schedulers/base.py
+++ b/apscheduler/schedulers/base.py
@@ -9,11 +9,12 @@ from dateutil.tz import tzlocal
import six
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
-from apscheduler.util import *
+from apscheduler.executors.base import MaxInstancesReachedError
+from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
-from apscheduler.job import Job, MaxInstancesReachedError, JobHandle
+from apscheduler.job import Job, JobHandle
from apscheduler.events import *
-from apscheduler.threadpool import ThreadPool
+from apscheduler.util import *
class BaseScheduler(six.with_metaclass(ABCMeta)):
@@ -27,6 +28,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
def __init__(self, gconfig={}, **options):
super(BaseScheduler, self).__init__()
+ self._executors = {}
+ self._executors_lock = self._create_lock()
self._jobstores = {}
self._jobstores_lock = self._create_lock()
self._listeners = []
@@ -52,9 +55,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if self.running:
raise SchedulerAlreadyRunningError
- # Create a RAMJobStore as the default if there is no default job store
+ # Create a default executor if nothing else is configured
+ if not 'default' in self._executors:
+ self.add_executor(self._create_default_executor(), 'default')
+
+ # Create a default job store if nothing else is configured
if not 'default' in self._jobstores:
- self.add_jobstore(MemoryJobStore(), 'default', True)
+ self.add_jobstore(self._create_default_jobstore(), 'default', True)
# Schedule all pending jobs
for job, jobstore in self._pending_jobs:
@@ -79,8 +86,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self._stopped = True
- # Shut down the thread pool
- self._threadpool.shutdown(wait)
+ # Shut down all executors
+ for executor in six.itervalues(self._executors):
+ executor.shutdown(wait)
# Close all job stores
for jobstore in six.itervalues(self._jobstores):
@@ -93,7 +101,23 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
def running(self):
return not self._stopped
- def add_jobstore(self, jobstore, alias, quiet=False):
+ def add_executor(self, executor, alias='default'):
+ """
+ Adds an executor to this scheduler.
+
+ :param executor: the executor instance to be added
+ :param alias: alias for the scheduler
+ :type executor: `~apscheduler.executors.base.BaseExecutor`
+ :type alias: str
+ """
+
+ with self._executors_lock:
+ if alias in self._executors:
+ raise KeyError('This scheduler already has an executor by the alias of "%s"' % alias)
+ executor.scheduler = self
+ self._executors[alias] = executor
+
+ def add_jobstore(self, jobstore, alias='default', quiet=False):
"""
Adds a job store to this scheduler.
@@ -156,7 +180,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
del self._listeners[i]
def add_job(self, trigger, func, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None,
- max_runs=None, max_instances=1, jobstore='default', **trigger_args):
+ max_runs=None, max_instances=1, jobstore='default', executor='default', **trigger_args):
"""
Adds the given job to the job list and notifies the scheduler thread.
@@ -182,6 +206,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param max_runs: maximum number of times this job is allowed to be triggered
:param max_instances: maximum number of concurrently running instances allowed for this job
:param jobstore: alias of the job store to store the job in
+ :param executor: alias of the executor to run the job with
:type id: str/unicode
:type args: list/tuple
:type jobstore: str/unicode
@@ -190,6 +215,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:type coalesce: bool
:type max_runs: int
:type max_instances: int
+ :type jobstore: str
+ :type executor: str
:rtype: :class:`~apscheduler.job.JobHandle`
"""
@@ -198,6 +225,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
job_kwargs = {
'trigger': trigger,
'trigger_args': trigger_args,
+ 'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
@@ -220,12 +248,12 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
return JobHandle(self, jobstore, job)
def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None,
- max_runs=None, max_instances=1, jobstore='default', **trigger_args):
+ max_runs=None, max_instances=1, jobstore='default', executor='default', **trigger_args):
"""A decorator version of :meth:`add_job`."""
def inner(func):
self.add_job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs,
- max_instances, jobstore, **trigger_args)
+ max_instances, jobstore, executor, **trigger_args)
return func
return inner
@@ -360,12 +388,19 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.coalesce = asbool(config.pop('coalesce', True))
self.timezone = astimezone(config.pop('timezone', None)) or tzlocal()
- # Configure the thread pool
- if 'threadpool' in config:
- self._threadpool = maybe_ref(config['threadpool'])
- else:
- threadpool_opts = combine_opts(config, 'threadpool.')
- self._threadpool = ThreadPool(**threadpool_opts)
+ # Configure executors
+ executor_opts = combine_opts(config, 'executor.')
+ executors = {}
+ for key, value in executor_opts.items():
+ store_name, option = key.split('.', 1)
+ opts_dict = executors.setdefault(store_name, {})
+ opts_dict[option] = value
+
+ for alias, opts in executors.items():
+ classname = opts.pop('class')
+ cls = maybe_ref(classname)
+ executor = cls(**opts)
+ self.add_executor(executor, alias)
# Configure job stores
jobstore_opts = combine_opts(config, 'jobstore.')
@@ -381,6 +416,18 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
jobstore = cls(**opts)
self.add_jobstore(jobstore, alias, True)
+ def _create_default_jobstore(self):
+ return MemoryJobStore()
+
+ def _create_default_executor(self):
+ return ThreadPoolExecutor(self)
+
+ def _lookup_executor(self, executor):
+ try:
+ return self._executors[executor]
+ except KeyError:
+ raise KeyError('No such executor: %s' % executor)
+
def _notify_listeners(self, event):
with self._listeners_lock:
listeners = tuple(self._listeners)
@@ -393,8 +440,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.logger.exception('Error notifying listener')
def _real_add_job(self, job, jobstore, wakeup):
- # Recalculate the next run time
- job.next_run_time = job.trigger.get_next_fire_time(self._current_time())
+ # Calculate the next run time
+ now = datetime.now(self.timezone)
+ job.next_run_time = job.trigger.get_next_fire_time(now)
# Add the job to the given job store
store = self._jobstores.get(jobstore)
@@ -417,70 +465,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
"""Triggers :meth:`_process_jobs` to be run in an implementation specific manner."""
def _create_lock(self):
- return RLock()
-
- def _current_time(self):
- return datetime.now(self.timezone)
-
- def _run_job(self, job, run_times):
- """Acts as a harness that runs the actual job code in the thread pool."""
-
- for run_time in run_times:
- # See if the job missed its run time window, and handle possible
- # misfires accordingly
- difference = self._current_time() - run_time
- grace_time = timedelta(seconds=job.misfire_grace_time)
- if difference > grace_time:
- # Notify listeners about a missed run
- event = JobEvent(EVENT_JOB_MISSED, job, run_time)
- self._notify_listeners(event)
- self.logger.warning('Run time of job "%s" was missed by %s', job, difference)
- else:
- try:
- job.add_instance()
- except MaxInstancesReachedError:
- event = JobEvent(EVENT_JOB_MISSED, job, run_time)
- self._notify_listeners(event)
- self.logger.warning(
- 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', job,
- job.max_instances)
- break
-
- self.logger.info('Running job "%s" (scheduled at %s)', job, run_time)
-
- try:
- retval = job.func(*job.args, **job.kwargs)
- except:
- # Notify listeners about the exception
- exc, tb = sys.exc_info()[1:]
- event = JobEvent(EVENT_JOB_ERROR, job, run_time, exception=exc, traceback=tb)
- self._notify_listeners(event)
+ """Creates a reentrant lock object."""
- self.logger.exception('Job "%s" raised an exception', job)
- else:
- # Notify listeners about successful execution
- event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, retval=retval)
- self._notify_listeners(event)
-
- self.logger.info('Job "%s" executed successfully', job)
-
- job.remove_instance()
-
- # If coalescing is enabled, don't attempt any further runs
- if job.coalesce:
- break
+ return RLock()
def _process_jobs(self):
- """Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for
- the next round.
+ """
+ Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for the next
+ round.
"""
self.logger.debug('Looking for jobs to run')
- now = self._current_time()
+ now = datetime.now(self.timezone)
next_wakeup_time = None
with self._jobstores_lock:
- for alias, jobstore in six.iteritems(self._jobstores):
+ for jobstore_alias, jobstore in six.iteritems(self._jobstores):
jobs, jobstore_next_wakeup_time = jobstore.get_pending_jobs(now)
if not next_wakeup_time:
next_wakeup_time = jobstore_next_wakeup_time
@@ -488,27 +488,42 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
next_wakeup_time = min(next_wakeup_time or jobstore_next_wakeup_time)
for job in jobs:
+ # Look up the job's executor
+ try:
+ executor = self._lookup_executor(job.executor)
+ except:
+ self.logger.error('Executor lookup failed for job "%s": %s', job, job.executor)
+ continue
+
run_times = job.get_run_times(now)
+ run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
- self._threadpool.submit(self._run_job, job, run_times)
-
- # Update the job, but don't keep finished jobs around
- job_runs = job.runs + 1 if job.coalesce else len(run_times)
+ try:
+ executor.submit_job(job, run_times)
+ except MaxInstancesReachedError:
+ self.logger.warning(
+ 'Execution of job "%s" skipped: maximum number of running instances reached (%d)',
+ job, job.max_instances)
+ continue
+ except:
+ self.logger.exception('Error submitting job "%s" to executor "%s"', job, job.executor)
+ continue
+
+ # Update the job if it has a next execution time and the number of runs has not reached maximum,
+ # otherwise remove it from the job store
+ job_runs = job.runs + len(run_times)
job_next_run = job.trigger.get_next_fire_time(now + timedelta(microseconds=1))
if job_next_run and (job.max_runs is None or job_runs < job.max_runs):
changes = {'next_run_time': job_next_run, 'runs': job_runs}
jobstore.modify_job(job.id, changes)
+ next_wakeup_time = min(next_wakeup_time, job_next_run) if next_wakeup_time else job_next_run
else:
- self.remove_job(job.id, alias)
-
- if not next_wakeup_time:
- next_wakeup_time = job.next_run_time
- elif job.next_run_time:
- next_wakeup_time = min(next_wakeup_time, job.next_run_time)
+ self.remove_job(job.id, jobstore_alias)
# Determine the delay until this method should be called again
if next_wakeup_time is not None:
- wait_seconds = time_difference(next_wakeup_time, now)
+ wait_seconds = timedelta_seconds(next_wakeup_time - now)
+ self.logger.debug('now = %s, next_wakeup_time = %s', now, next_wakeup_time)
self.logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds)
else:
wait_seconds = None
diff --git a/apscheduler/threadpool.py b/apscheduler/threadpool.py
deleted file mode 100644
index 6419941..0000000
--- a/apscheduler/threadpool.py
+++ /dev/null
@@ -1,124 +0,0 @@
-"""
-Generic thread pool class. Modeled after Java's ThreadPoolExecutor.
-Please note that this ThreadPool does *not* fully implement the PEP 3148 ThreadPool!
-"""
-
-from threading import Thread, Lock, currentThread
-from weakref import ref
-import logging
-import atexit
-
-try:
- from queue import Queue, Empty
-except ImportError:
- from Queue import Queue, Empty
-
-logger = logging.getLogger(__name__)
-_threadpools = set()
-
-
-# Worker threads are daemonic in order to let the interpreter exit without an explicit shutdown of the thread pool.
-# The following trick is necessary to allow worker threads to finish cleanly.
-def _shutdown_all():
- for pool_ref in tuple(_threadpools):
- pool = pool_ref()
- if pool:
- pool.shutdown()
-
-atexit.register(_shutdown_all)
-
-
-class ThreadPool(object):
- def __init__(self, core_threads=0, max_threads=20, keepalive=1):
- """
- :param core_threads: maximum number of persistent threads in the pool
- :param max_threads: maximum number of total threads in the pool
- :param thread_class: callable that creates a Thread object
- :param keepalive: seconds to keep non-core worker threads waiting for new tasks
- """
- self.core_threads = core_threads
- self.max_threads = max(max_threads, core_threads, 1)
- self.keepalive = keepalive
- self._queue = Queue()
- self._threads_lock = Lock()
- self._threads = set()
- self._shutdown = False
-
- _threadpools.add(ref(self))
- logger.info('Started thread pool with %d core threads and %s maximum threads', core_threads,
- max_threads or 'unlimited')
-
- def _adjust_threadcount(self):
- with self._threads_lock:
- if self.num_threads < self.max_threads:
- self._add_thread(self.num_threads < self.core_threads)
-
- def _add_thread(self, core):
- t = Thread(target=self._run_jobs, args=(core,))
- t.setDaemon(True)
- t.start()
- self._threads.add(t)
-
- def _run_jobs(self, core):
- logger.debug('Started worker thread')
- block = True
- timeout = None
- if not core:
- block = self.keepalive > 0
- timeout = self.keepalive
-
- while True:
- try:
- func, args, kwargs = self._queue.get(block, timeout)
- except Empty:
- break
-
- if self._shutdown:
- break
-
- try:
- func(*args, **kwargs)
- except:
- logger.exception('Error in worker thread')
-
- with self._threads_lock:
- self._threads.remove(currentThread())
-
- logger.debug('Exiting worker thread')
-
- @property
- def num_threads(self):
- return len(self._threads)
-
- def submit(self, func, *args, **kwargs):
- if self._shutdown:
- raise RuntimeError('Cannot schedule new tasks after shutdown')
-
- self._queue.put((func, args, kwargs))
- self._adjust_threadcount()
-
- def shutdown(self, wait=True):
- if self._shutdown:
- return
-
- logging.info('Shutting down thread pool')
- self._shutdown = True
- _threadpools.remove(ref(self))
-
- with self._threads_lock:
- for _ in range(self.num_threads):
- self._queue.put((None, None, None))
-
- if wait:
- with self._threads_lock:
- threads = tuple(self._threads)
- for thread in threads:
- thread.join()
-
- def __repr__(self):
- if self.max_threads:
- threadcount = '%d/%d' % (self.num_threads, self.max_threads)
- else:
- threadcount = '%d' % self.num_threads
-
- return '<ThreadPool at %x; threads=%s>' % (id(self), threadcount)
diff --git a/apscheduler/util.py b/apscheduler/util.py
index c5a5e3e..170ff99 100644
--- a/apscheduler/util.py
+++ b/apscheduler/util.py
@@ -4,15 +4,14 @@ This module contains several handy functions primarily meant for internal use.
from datetime import date, datetime, timedelta, tzinfo
from calendar import timegm
-from time import mktime
import re
from dateutil.tz import gettz, tzutc
import six
__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp',
- 'utc_timestamp_to_datetime', 'timedelta_seconds', 'time_difference', 'datetime_ceil', 'combine_opts',
- 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref')
+ 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'combine_opts', 'get_callable_name',
+ 'obj_to_ref', 'ref_to_obj', 'maybe_ref')
def asint(text):
@@ -121,22 +120,6 @@ def timedelta_seconds(delta):
delta.microseconds / 1000000.0
-def time_difference(date1, date2):
- """
- Returns the time difference in seconds between the given two datetime objects.
- The difference is calculated as: date1 - date2.
-
- :param date1: the later datetime
- :type date1: datetime
- :param date2: the earlier datetime
- :type date2: datetime
- :rtype: float
- """
- later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0
- earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0
- return later - earlier
-
-
def datetime_ceil(dateval):
"""
Rounds the given datetime object upwards.
diff --git a/examples/executors/processpool.py b/examples/executors/processpool.py
new file mode 100644
index 0000000..e76eaa6
--- /dev/null
+++ b/examples/executors/processpool.py
@@ -0,0 +1,24 @@
+"""
+Demonstrates how to schedule a job to be run in a process pool on 3 second intervals.
+"""
+
+from datetime import datetime
+from apscheduler.executors.pool import ProcessPoolExecutor
+
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+
+def tick():
+ print('Tick! The time is: %s' % datetime.now())
+
+
+if __name__ == '__main__':
+ scheduler = BlockingScheduler()
+ scheduler.add_executor(ProcessPoolExecutor(scheduler))
+ scheduler.add_job('interval', tick, seconds=3)
+ print('Press Ctrl+C to exit')
+
+ try:
+ scheduler.start()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/examples/jobstores/mongodb.py b/examples/jobstores/mongodb.py
index 49f1f0b..c6895e8 100644
--- a/examples/jobstores/mongodb.py
+++ b/examples/jobstores/mongodb.py
@@ -18,9 +18,13 @@ def alarm(time):
if __name__ == '__main__':
scheduler = BlockingScheduler()
- scheduler.add_jobstore(MongoDBJobStore(collection='example_jobs'))
+ jobstore = MongoDBJobStore(collection='example_jobs')
+ if len(sys.argv) > 1 and sys.argv[1] == '--clear':
+ jobstore.remove_all_jobs()
+
+ scheduler.add_jobstore(jobstore)
alarm_time = datetime.now() + timedelta(seconds=10)
- scheduler.add_job(alarm, 'date', [alarm_time], args=[datetime.now()])
+ scheduler.add_job('date', alarm, run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, run this example with the --clear argument.')
print('Press Ctrl+C to exit')
diff --git a/examples/jobstores/sqlalchemy_.py b/examples/jobstores/sqlalchemy_.py
index 561c3f8..4423d11 100644
--- a/examples/jobstores/sqlalchemy_.py
+++ b/examples/jobstores/sqlalchemy_.py
@@ -18,10 +18,10 @@ def alarm(time):
if __name__ == '__main__':
scheduler = BlockingScheduler()
- url = sys.argv[1] or 'sqlite://example.sqlite'
+ url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite'
scheduler.add_jobstore(SQLAlchemyJobStore(url))
alarm_time = datetime.now() + timedelta(seconds=10)
- scheduler.add_job(alarm, 'date', [alarm_time], args=[datetime.now()])
+ scheduler.add_job('date', alarm, run_date=alarm_time, args=[datetime.now()])
print('To clear the alarms, delete the example.sqlite file.')
print('Press Ctrl+C to exit')
diff --git a/examples/misc/reference.py b/examples/misc/reference.py
index 2b0c414..8bb41bf 100644
--- a/examples/misc/reference.py
+++ b/examples/misc/reference.py
@@ -7,7 +7,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler
if __name__ == '__main__':
scheduler = BlockingScheduler()
- scheduler.add_job('sys:stdout.write', 'interval', {'seconds': 3}, args=['tick\n'])
+ scheduler.add_job('interval', 'sys:stdout.write', seconds=3, args=['tick\n'])
print('Press Ctrl+C to exit')
try:
diff --git a/examples/schedulers/asyncio_.py b/examples/schedulers/asyncio_.py
index 01cc138..df6574e 100644
--- a/examples/schedulers/asyncio_.py
+++ b/examples/schedulers/asyncio_.py
@@ -14,7 +14,7 @@ def tick():
if __name__ == '__main__':
scheduler = AsyncIOScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
scheduler.start()
print('Press Ctrl+C to exit')
diff --git a/examples/schedulers/background.py b/examples/schedulers/background.py
index 8a3dde5..d9862e9 100644
--- a/examples/schedulers/background.py
+++ b/examples/schedulers/background.py
@@ -14,7 +14,7 @@ def tick():
if __name__ == '__main__':
scheduler = BackgroundScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
scheduler.start()
print('Press Ctrl+C to exit')
diff --git a/examples/schedulers/blocking.py b/examples/schedulers/blocking.py
index 55d1385..359b366 100644
--- a/examples/schedulers/blocking.py
+++ b/examples/schedulers/blocking.py
@@ -13,7 +13,7 @@ def tick():
if __name__ == '__main__':
scheduler = BlockingScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
print('Press Ctrl+C to exit')
try:
diff --git a/examples/schedulers/gevent_.py b/examples/schedulers/gevent_.py
index 597aa4d..bf83fb3 100644
--- a/examples/schedulers/gevent_.py
+++ b/examples/schedulers/gevent_.py
@@ -13,7 +13,7 @@ def tick():
if __name__ == '__main__':
scheduler = GeventScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
g = scheduler.start() # g is the greenlet that runs the scheduler loop
print('Press Ctrl+C to exit')
diff --git a/examples/schedulers/qt.py b/examples/schedulers/qt.py
index 15b9192..1ea9fba 100644
--- a/examples/schedulers/qt.py
+++ b/examples/schedulers/qt.py
@@ -30,7 +30,7 @@ if __name__ == '__main__':
label.show()
scheduler = QtScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
scheduler.start()
# Execution will block here until the user closes the windows or Ctrl+C is pressed.
diff --git a/examples/schedulers/tornado_.py b/examples/schedulers/tornado_.py
index 42ddab5..92d0ef1 100644
--- a/examples/schedulers/tornado_.py
+++ b/examples/schedulers/tornado_.py
@@ -14,7 +14,7 @@ def tick():
if __name__ == '__main__':
scheduler = TornadoScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
scheduler.start()
print('Press Ctrl+C to exit')
diff --git a/examples/schedulers/twisted_.py b/examples/schedulers/twisted_.py
index a1f7358..33a8065 100644
--- a/examples/schedulers/twisted_.py
+++ b/examples/schedulers/twisted_.py
@@ -14,7 +14,7 @@ def tick():
if __name__ == '__main__':
scheduler = TwistedScheduler()
- scheduler.add_job(tick, 'interval', {'seconds': 3})
+ scheduler.add_job('interval', tick, seconds=3)
scheduler.start()
print('Press Ctrl+C to exit')
diff --git a/setup.py b/setup.py
index e0516be..8e964c7 100644
--- a/setup.py
+++ b/setup.py
@@ -19,6 +19,10 @@ class PyTest(TestCommand):
errno = pytest.main(self.test_args)
sys.exit(errno)
+extra_requirements = []
+if sys.version_info < (3, 2):
+ extra_requirements.append('futures')
+
here = os.path.dirname(__file__)
readme_path = os.path.join(here, 'README.rst')
readme = open(readme_path).read()
@@ -45,7 +49,7 @@ setup(
keywords='scheduling cron',
license='MIT',
packages=find_packages(exclude=['tests']),
- install_requires=['six', 'python-dateutil'],
+ install_requires=['six', 'python-dateutil'] + extra_requirements,
tests_require=['pytest >= 2.5.1', 'pytest-cov'],
cmdclass={'test': PyTest},
zip_safe=False,
diff --git a/tests/conftest.py b/tests/conftest.py
index 3c1074a..1f72d9d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,14 +1,22 @@
+from datetime import datetime
import sys
+from dateutil.tz import tzoffset
import pytest
+from apscheduler.job import Job
+
+try:
+ from unittest.mock import Mock
+except ImportError:
+ from mock import Mock
+
def minpython(*version):
version_str = '.'.join([str(num) for num in version])
def outer(func):
- dec = pytest.mark.skipif(sys.version_info < version,
- reason='This test requires at least Python %s' % version_str)
+ dec = pytest.mark.skipif(sys.version_info < version, reason='Requires Python >= %s' % version_str)
return dec(func)
return outer
@@ -17,7 +25,57 @@ def maxpython(*version):
version_str = '.'.join([str(num) for num in version])
def outer(func):
- dec = pytest.mark.skipif(sys.version_info >= version,
- reason='This test should not be run on Python version %s or above' % version_str)
+ dec = pytest.mark.skipif(sys.version_info >= version, reason='Requires Python < %s' % version_str)
return dec(func)
return outer
+
+
+@pytest.fixture(scope='session')
+def timezone():
+ return tzoffset('DUMMYTZ', 3600)
+
+
+@pytest.fixture
+def freeze_time(monkeypatch, timezone):
+ class TimeFreezer:
+ def __init__(self, initial):
+ self.current = initial
+ self.increment = None
+
+ def get(self, tzinfo):
+ now = self.current.astimezone(tzinfo)
+ if self.increment:
+ self.current += self.increment
+ return now
+
+ def set(self, new_time):
+ self.current = new_time
+
+ def next(self,):
+ return self.current + self.increment
+
+ def set_increment(self, delta):
+ self.increment = delta
+
+ freezer = TimeFreezer(datetime(2011, 4, 3, 18, 40, tzinfo=timezone))
+ fake_datetime = Mock(datetime, now=freezer.get)
+ monkeypatch.setattr('apscheduler.schedulers.base.datetime', fake_datetime)
+ monkeypatch.setattr('apscheduler.executors.pool.datetime', fake_datetime)
+ return freezer
+
+
+@pytest.fixture(scope='session')
+def job_defaults(timezone):
+ run_date = datetime(2011, 4, 3, 18, 40, tzinfo=timezone)
+ return {'trigger': 'date', 'trigger_args': {'run_date': run_date, 'timezone': timezone}, 'executor': 'default',
+ 'args': (), 'kwargs': {}, 'id': 'testid', 'misfire_grace_time': 1, 'coalesce': False, 'name': None,
+ 'max_runs': None, 'max_instances': 1}
+
+
+@pytest.fixture(scope='session')
+def create_job(job_defaults):
+ def create(**kwargs):
+ job_kwargs = job_defaults.copy()
+ job_kwargs.update(kwargs)
+ return Job(**job_kwargs)
+ return create
diff --git a/tests/test_executors.py b/tests/test_executors.py
new file mode 100644
index 0000000..e16897b
--- /dev/null
+++ b/tests/test_executors.py
@@ -0,0 +1,46 @@
+import time
+
+import pytest
+
+from apscheduler.executors.base import MaxInstancesReachedError
+from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
+
+try:
+ from unittest.mock import Mock, MagicMock
+except ImportError:
+ from mock import Mock, MagicMock
+
+
+@pytest.fixture
+def scheduler():
+ scheduler_ = Mock([])
+ scheduler_._create_lock = MagicMock()
+ return scheduler_
+
+
+@pytest.fixture(params=[ThreadPoolExecutor, ProcessPoolExecutor], ids=['threadpool', 'processpool'])
+def executor(request, scheduler):
+ executor_ = request.param(scheduler)
+ request.addfinalizer(executor_.shutdown)
+ return executor_
+
+
+def wait_event():
+ time.sleep(0.2)
+ return 'test'
+
+
+def test_max_instances(scheduler, executor, create_job, freeze_time):
+ """Tests that the maximum instance limit on a job is respected."""
+
+ events = []
+ scheduler._notify_listeners = lambda event: events.append(event)
+ job = create_job(func=wait_event, max_instances=2, max_runs=3)
+ executor.submit_job(job, [freeze_time.current])
+ executor.submit_job(job, [freeze_time.current])
+
+ pytest.raises(MaxInstancesReachedError, executor.submit_job, job, [freeze_time.current])
+ executor.shutdown()
+ assert len(events) == 2
+ assert events[0].retval == 'test'
+ assert events[1].retval == 'test'
diff --git a/tests/test_job.py b/tests/test_job.py
index 5d38032..fee6c4e 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -4,7 +4,7 @@ from dateutil.tz import tzoffset
import pytest
import six
-from apscheduler.job import Job, MaxInstancesReachedError, JobHandle
+from apscheduler.job import Job, JobHandle
from apscheduler.triggers.date import DateTrigger
from tests.conftest import maxpython
@@ -16,7 +16,7 @@ except ImportError:
local_tz = tzoffset('DUMMYTZ', 3600)
run_time = datetime(2010, 12, 13, 0, 8, 0, tzinfo=local_tz)
-job_defaults = {'trigger': 'date', 'trigger_args': {'run_date': run_time, 'timezone': local_tz},
+job_defaults = {'trigger': 'date', 'executor': 'default', 'trigger_args': {'run_date': run_time, 'timezone': local_tz},
'func': '%s:dummyfunc' % __name__, 'args': (), 'kwargs': {}, 'id': 'testid', 'misfire_grace_time': 1,
'coalesce': False, 'name': None, 'max_runs': None, 'max_instances': 1}
@@ -75,17 +75,18 @@ class TestJob(object):
def test_getstate(self, trigger):
job = create_job(trigger=trigger, trigger_args=None)
state = job.__getstate__()
- assert state == dict(version=1, trigger=trigger, func='tests.test_job:dummyfunc', name='dummyfunc', args=(),
- kwargs={}, id='testid', misfire_grace_time=1, coalesce=False, max_runs=None,
- max_instances=1, runs=0, next_run_time=None)
+ assert state == dict(version=1, trigger=trigger, executor='default', func='tests.test_job:dummyfunc',
+ name='dummyfunc', args=(), kwargs={}, id='testid', misfire_grace_time=1, coalesce=False,
+ max_runs=None, max_instances=1, runs=0, next_run_time=None)
def test_setstate(self, job):
trigger = DateTrigger(local_tz, '2010-12-14 13:05:00')
- state = dict(version=1, trigger=trigger, func='tests.test_job:dummyfunc', name='testjob.dummyfunc',
- args=[], kwargs={}, id='other_id', misfire_grace_time=2, max_runs=2, coalesce=True,
- max_instances=2, runs=1, next_run_time=None)
+ state = dict(version=1, trigger=trigger, executor='dummyexecutor', func='tests.test_job:dummyfunc',
+ name='testjob.dummyfunc', args=[], kwargs={}, id='other_id', misfire_grace_time=2, max_runs=2,
+ coalesce=True, max_instances=2, runs=1, next_run_time=None)
job.__setstate__(state)
assert job.id == 'other_id'
+ assert job.executor == 'dummyexecutor'
assert job.trigger == trigger
assert job.func == dummyfunc
assert job.max_runs == 2
@@ -105,28 +106,6 @@ class TestJob(object):
assert not job == 'bleh'
- def test_instances(self, job):
- job.max_instances = 2
- assert job.instances == 0
-
- job.add_instance()
- assert job.instances == 1
-
- job.add_instance()
- assert job.instances == 2
-
- with pytest.raises(MaxInstancesReachedError):
- job.add_instance()
-
- job.remove_instance()
- assert job.instances == 1
-
- job.remove_instance()
- assert job.instances == 0
-
- with pytest.raises(AssertionError):
- job.remove_instance()
-
def test_job_repr(self, job):
assert repr(job) == '<Job (id=testid)>'
diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py
index a899eba..5c87c94 100644
--- a/tests/test_jobstores.py
+++ b/tests/test_jobstores.py
@@ -1,7 +1,6 @@
from datetime import datetime
import os
-from dateutil.tz import tzoffset
import pytest
from apscheduler.jobstores.memory import MemoryJobStore
@@ -9,10 +8,10 @@ from apscheduler.jobstores.base import JobLookupError, ConflictingIdError, Trans
from apscheduler.triggers.date import DateTrigger
from apscheduler.job import Job
-local_tz = tzoffset('DUMMYTZ', 3600)
+#local_tz = tzoffset('DUMMYTZ', 3600)
run_time = datetime(2999, 1, 1)
-job_defaults = {'args': (), 'kwargs': {}, 'misfire_grace_time': 1, 'coalesce': False, 'name': None, 'max_runs': None,
- 'max_instances': 1}
+#job_defaults = {'args': (), 'kwargs': {}, 'misfire_grace_time': 1, 'coalesce': False, 'name': None, 'max_runs': None,
+# 'max_instances': 1}
def dummy_job():
@@ -36,7 +35,8 @@ def memjobstore(request):
def sqlalchemyjobstore(request):
def finish():
store.close()
- os.remove('tempdb.sqlite')
+ if os.path.exists('tempdb.sqlite'):
+ os.remove('tempdb.sqlite')
sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy')
store = sqlalchemy.SQLAlchemyJobStore(url='sqlite:///tempdb.sqlite')
@@ -57,72 +57,74 @@ def mongodbjobstore(request):
return store
-@pytest.fixture
+@pytest.fixture(params=[memjobstore, sqlalchemyjobstore, mongodbjobstore], ids=['memory', 'sqlalchemy', 'mongodb'])
def jobstore(request):
return request.param(request)
-def create_job(jobstore, func=dummy_job, trigger_date=run_time, id=None):
- trigger_date.replace(tzinfo=local_tz)
- trigger = DateTrigger(local_tz, trigger_date)
- job = Job(trigger=trigger, func=func, id=id, next_run_time=trigger.run_date, **job_defaults)
- jobstore.add_job(job)
- return job
-
-persistent_jobstores = [sqlalchemyjobstore, mongodbjobstore]
-persistent_jobstores_ids = ['sqlalchemy', 'mongodb']
-all_jobstores = [memjobstore] + persistent_jobstores
-all_jobstores_ids = ['memory'] + persistent_jobstores_ids
+@pytest.fixture(params=[sqlalchemyjobstore, mongodbjobstore], ids=['sqlalchemy', 'mongodb'])
+def persistent_jobstore(request):
+ return request.param(request)
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_lookup_job(jobstore):
+@pytest.fixture
+def create_job(timezone, job_defaults):
+ def create(jobstore, func=dummy_job, trigger_date=run_time, id=None):
+ trigger_date = trigger_date.replace(tzinfo=timezone)
+ trigger = DateTrigger(timezone, trigger_date)
+ job_kwargs = job_defaults.copy()
+ job_kwargs['func'] = func
+ job_kwargs['trigger'] = trigger
+ job_kwargs['id'] = id
+ job = Job(**job_kwargs)
+ job.next_run_time = job.trigger.get_next_fire_time(trigger_date)
+ jobstore.add_job(job)
+ return job
+
+ return create
+
+
+def test_lookup_job(jobstore, create_job):
initial_job = create_job(jobstore)
job = jobstore.lookup_job(initial_job.id)
assert job == initial_job
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
def test_lookup_nonexistent_job(jobstore):
pytest.raises(JobLookupError, jobstore.lookup_job, 'foo')
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_get_all_jobs(jobstore):
+def test_get_all_jobs(jobstore, create_job):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2013, 8, 14))
jobs = jobstore.get_all_jobs()
assert jobs == [job2, job1]
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_get_pending_jobs(jobstore):
+def test_get_pending_jobs(jobstore, create_job, timezone):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- jobs, next_run_time = jobstore.get_pending_jobs(datetime(2014, 2, 27, tzinfo=local_tz))
+ jobs, next_run_time = jobstore.get_pending_jobs(datetime(2014, 2, 27, tzinfo=timezone))
assert jobs == [job3, job2]
assert next_run_time == job1.trigger.run_date
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_get_pending_jobs_no_next(jobstore):
+def test_get_pending_jobs_no_next(jobstore, create_job, timezone):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- jobs, next_run_time = jobstore.get_pending_jobs(datetime(2016, 5, 10, tzinfo=local_tz))
+ jobs, next_run_time = jobstore.get_pending_jobs(datetime(2016, 5, 10, tzinfo=timezone))
assert jobs == [job3, job2, job1]
assert next_run_time is None
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_add_job_conflicting_id(jobstore):
+def test_add_job_conflicting_id(jobstore, create_job):
create_job(jobstore, dummy_job, datetime(2016, 5, 3), id='blah')
pytest.raises(ConflictingIdError, create_job, jobstore, dummy_job2, datetime(2014, 2, 26), id='blah')
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_update_job_id_and_others(jobstore):
+def test_update_job_id_and_others(jobstore, create_job):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
jobstore.modify_job(job1.id, {'id': 'foo', 'max_instances': 6})
@@ -134,20 +136,18 @@ def test_update_job_id_and_others(jobstore):
assert jobs[1].max_instances == 6
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_update_job_next_runtime(jobstore):
+def test_update_job_next_runtime(jobstore, create_job, timezone):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- jobstore.modify_job(job1.id, {'next_run_time': datetime(2014, 1, 3, tzinfo=local_tz)})
+ jobstore.modify_job(job1.id, {'next_run_time': datetime(2014, 1, 3, tzinfo=timezone)})
jobs = jobstore.get_all_jobs()
assert len(jobs) == 3
assert jobs == [job3, job1, job2]
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_update_job_next_runtime_empty(jobstore):
+def test_update_job_next_runtime_empty(jobstore, create_job):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
jobstore.modify_job(job1.id, {'next_run_time': None})
@@ -157,38 +157,33 @@ def test_update_job_next_runtime_empty(jobstore):
assert jobs == [job1, job2]
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_update_job_conflicting_id(jobstore):
+def test_update_job_conflicting_id(jobstore, create_job):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
pytest.raises(ConflictingIdError, jobstore.modify_job, job2.id, {'id': job1.id})
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
def test_update_job_nonexistent_job(jobstore):
pytest.raises(JobLookupError, jobstore.modify_job, 'foo', {'next_run_time': None})
-@pytest.mark.parametrize('jobstore', persistent_jobstores, indirect=True, ids=persistent_jobstores_ids)
-def test_one_job_fails_to_load(jobstore, monkeypatch):
- job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
- job3 = create_job(jobstore, dummy_job3, datetime(2013, 8, 14))
+def test_one_job_fails_to_load(persistent_jobstore, create_job, monkeypatch):
+ job1 = create_job(persistent_jobstore, dummy_job, datetime(2016, 5, 3))
+ job2 = create_job(persistent_jobstore, dummy_job2, datetime(2014, 2, 26))
+ job3 = create_job(persistent_jobstore, dummy_job3, datetime(2013, 8, 14))
# Make the dummy_job2 function disappear
monkeypatch.delitem(globals(), 'dummy_job2')
- jobs = jobstore.get_all_jobs()
+ jobs = persistent_jobstore.get_all_jobs()
assert jobs == [job3, job1]
-@pytest.mark.parametrize('jobstore', persistent_jobstores, indirect=True, ids=persistent_jobstores_ids)
-def test_transient_job_error(jobstore):
- pytest.raises(TransientJobError, create_job, jobstore, lambda: None, datetime(2016, 5, 3))
+def test_transient_job_error(persistent_jobstore, create_job):
+ pytest.raises(TransientJobError, create_job, persistent_jobstore, lambda: None, datetime(2016, 5, 3))
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_remove_job(jobstore):
+def test_remove_job(jobstore, create_job):
job1 = create_job(jobstore, dummy_job, datetime(2016, 5, 3))
job2 = create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
@@ -201,13 +196,11 @@ def test_remove_job(jobstore):
assert jobs == []
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
def test_remove_nonexistent_job(jobstore):
pytest.raises(JobLookupError, jobstore.remove_job, 'blah')
-@pytest.mark.parametrize('jobstore', all_jobstores, indirect=True, ids=all_jobstores_ids)
-def test_remove_all_jobs(jobstore):
+def test_remove_all_jobs(jobstore, create_job):
create_job(jobstore, dummy_job, datetime(2016, 5, 3))
create_job(jobstore, dummy_job2, datetime(2014, 2, 26))
@@ -228,7 +221,7 @@ def test_repr_mongodbjobstore(mongodbjobstore):
assert repr(mongodbjobstore) == "<MongoDBJobStore (connection=Connection('localhost', 27017))>"
-def test_memstore_close(memjobstore):
+def test_memstore_close(memjobstore, create_job):
create_job(memjobstore, dummy_job, datetime(2016, 5, 3))
memjobstore.close()
assert not memjobstore.get_all_jobs()
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 4802232..15f6370 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -1,22 +1,21 @@
from datetime import datetime, timedelta
from logging import StreamHandler, ERROR
-from threading import Event, Thread
+from concurrent.futures import Future
+from threading import Thread
from copy import copy
-from time import sleep
import os
-from dateutil.tz import tzoffset
import pytest
+from apscheduler.executors.pool import BasePoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
-from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import BaseScheduler
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import (SchedulerEvent, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
EVENT_JOB_MISSED, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_JOB_ADDED,
EVENT_JOBSTORE_JOB_REMOVED, EVENT_JOBSTORE_JOB_MODIFIED)
-from apscheduler.schedulers.blocking import BlockingScheduler
-from apscheduler.threadpool import ThreadPool
from tests.conftest import minpython
try:
@@ -29,15 +28,20 @@ try:
except ImportError:
from mock import MagicMock
-dummy_tz = tzoffset('DUMMYTZ', 3600)
-dummy_datetime = datetime(2011, 4, 3, 18, 40, tzinfo=dummy_tz)
+class DummyPoolExecutor(object):
+ def submit(self, func, *arg, **kwargs):
+ f = Future()
+ try:
+ retval = func(*arg, **kwargs)
+ except Exception as e:
+ f.set_exception(e)
+ else:
+ f.set_result(retval)
-class DummyThreadPool(object):
- def submit(self, func, *args, **kwargs):
- func(*args, **kwargs)
+ return f
- def shutdown(self, wait):
+ def shutdown(self, wait=True):
pass
@@ -46,9 +50,9 @@ class DummyException(Exception):
class DummyScheduler(BaseScheduler):
- def __init__(self, gconfig={}, **options):
- super(DummyScheduler, self).__init__(gconfig, timezone=dummy_tz, threadpool=DummyThreadPool(), **options)
- self.now = dummy_datetime
+ def __init__(self, timezone, gconfig={}, **options):
+ super(DummyScheduler, self).__init__(gconfig, timezone=timezone, **options)
+ self.add_executor(BasePoolExecutor(self, DummyPoolExecutor()), 'default')
def start(self):
super(DummyScheduler, self).start()
@@ -56,20 +60,20 @@ class DummyScheduler(BaseScheduler):
def shutdown(self, wait=True):
super(DummyScheduler, self).shutdown()
+ def _create_default_executor(self):
+ return DummyPoolExecutor()
+
def _wakeup(self):
pass
- def _current_time(self):
- return self.now
-
def increment(vals):
vals[0] += 1
@pytest.fixture
-def scheduler(request):
- sched = DummyScheduler()
+def scheduler(request, freeze_time, timezone):
+ sched = DummyScheduler(timezone)
if 'start_scheduler' in request.keywords:
sched.start()
request.addfinalizer(lambda: sched.shutdown() if sched.running else None)
@@ -208,7 +212,7 @@ class TestOfflineScheduler(object):
@pytest.mark.start_scheduler
class TestRunningScheduler(object):
- def test_add_job_object(self, scheduler):
+ def test_add_job_object(self, scheduler, freeze_time):
"""Tests that any callable object is accepted (and not just functions)."""
class A(object):
@@ -220,11 +224,11 @@ class TestRunningScheduler(object):
a = A()
job = scheduler.add_job('interval', a, seconds=1)
- scheduler.now = job.next_run_time
+ freeze_time.set(job.next_run_time)
scheduler._process_jobs()
assert a.val == 1
- def test_add_job_method(self, scheduler):
+ def test_add_job_method(self, scheduler, freeze_time):
"""Tests that bound methods can be scheduled (at least with MemoryJobStore)."""
class A(object):
@@ -236,7 +240,7 @@ class TestRunningScheduler(object):
a = A()
job = scheduler.add_job('interval', a.method, seconds=1)
- scheduler.now = job.next_run_time
+ freeze_time.set(job.next_run_time)
scheduler._process_jobs()
assert a.val == 1
@@ -283,7 +287,7 @@ class TestRunningScheduler(object):
scheduler._process_jobs()
assert vals[0] == 1
- def test_remove_all_jobs(self, scheduler):
+ def test_remove_all_jobs(self, scheduler, freeze_time):
"""Tests that removing all jobs clears all job stores."""
vals = [0]
@@ -303,20 +307,20 @@ class TestRunningScheduler(object):
scheduler.remove_all_jobs('default')
assert scheduler.get_jobs() == [job2]
- def test_job_finished(self, scheduler):
+ def test_job_finished(self, scheduler, freeze_time):
vals = [0]
job = scheduler.add_job('interval', increment, args=(vals,), max_runs=1)
- scheduler.now = job.next_run_time
+ freeze_time.set(job.next_run_time)
scheduler._process_jobs()
assert vals == [1]
assert job not in scheduler.get_jobs()
- def test_job_exception(self, scheduler, logstream):
+ def test_job_exception(self, scheduler, freeze_time, logstream):
def failure():
raise DummyException
job = scheduler.add_job('date', failure, run_date=datetime(9999, 9, 9))
- scheduler.now = job.next_run_time
+ freeze_time.set(job.next_run_time)
scheduler._process_jobs()
assert 'DummyException' in logstream.getvalue()
@@ -328,17 +332,17 @@ class TestRunningScheduler(object):
job = scheduler.add_job('interval', lambda: None, seconds=1, misfire_grace_time=2)
assert job.misfire_grace_time == 2
- def test_coalesce_on(self, scheduler):
+ def test_coalesce_on(self, scheduler, freeze_time):
"""Tests that the job is only executed once when it is scheduled to be executed twice in a row."""
vals = [0]
events = []
scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,),
+ job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,),
coalesce=True, misfire_grace_time=2)
# Turn the clock 2 seconds forward
- scheduler.now += timedelta(seconds=2)
+ freeze_time.set(freeze_time.current + timedelta(seconds=2))
scheduler._process_jobs()
job.refresh()
@@ -347,7 +351,7 @@ class TestRunningScheduler(object):
assert events[0].code == EVENT_JOB_EXECUTED
assert vals == [1]
- def test_coalesce_off(self, scheduler):
+ def test_coalesce_off(self, scheduler, freeze_time):
"""Tests that every scheduled run for the job is executed even when they are in the past
(but still within misfire_grace_time).
"""
@@ -355,11 +359,11 @@ class TestRunningScheduler(object):
vals = [0]
events = []
scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,),
+ job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,),
coalesce=False, misfire_grace_time=2)
# Turn the clock 2 seconds forward
- scheduler.now += timedelta(seconds=2)
+ freeze_time.set(freeze_time.current + timedelta(seconds=2))
scheduler._process_jobs()
job.refresh()
@@ -397,55 +401,24 @@ class TestRunningScheduler(object):
pytest.raises(KeyError, scheduler.remove_jobstore, 'dummy2')
- def test_job_next_run_time(self, scheduler):
+ def test_job_next_run_time(self, scheduler, freeze_time):
"""Tests against bug #5."""
vals = [0]
- job = scheduler.add_job('interval', increment, seconds=1, start_date=dummy_datetime, args=(vals,),
+ job = scheduler.add_job('interval', increment, seconds=1, start_date=freeze_time.current, args=(vals,),
misfire_grace_time=3)
- scheduler.now = job.next_run_time
+ freeze_time.set(job.next_run_time)
scheduler._process_jobs()
assert vals == [1]
scheduler._process_jobs()
assert vals == [1]
- scheduler.now = job.next_run_time + timedelta(seconds=1)
+ freeze_time.set(job.next_run_time + timedelta(seconds=1))
scheduler._process_jobs()
assert vals == [2]
- def test_max_instances(self, scheduler):
- """Tests that the maximum instance limit on a job is respected and that missed job events are dispatched when
- the job cannot be run due to the instance limitation.
- """
-
- def wait_event():
- vals[0] += 1
- event.wait(2)
-
- vals = [0]
- events = []
- event = Event()
- shutdown_event = Event()
- scheduler._threadpool = ThreadPool()
- scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
- scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN)
- scheduler.add_job('interval', wait_event, seconds=1, start_date=dummy_datetime, max_instances=2, max_runs=4)
- for _ in range(4):
- scheduler._process_jobs()
- scheduler.now += timedelta(seconds=1)
- event.set()
- scheduler.shutdown()
- shutdown_event.wait(2)
-
- assert vals == [2]
- assert len(events) == 4
- assert events[0].code == EVENT_JOB_MISSED
- assert events[1].code == EVENT_JOB_MISSED
- assert events[2].code == EVENT_JOB_EXECUTED
- assert events[3].code == EVENT_JOB_EXECUTED
-
def test_scheduler_double_start(self, scheduler):
pytest.raises(SchedulerAlreadyRunningError, scheduler.start)
@@ -458,162 +431,205 @@ class TestRunningScheduler(object):
class SchedulerImplementationTestBase(object):
- @pytest.fixture
- def scheduler(self, request):
- sched = self.create_scheduler()
- request.addfinalizer(lambda: self.finish(sched))
- return sched
-
- def create_scheduler(self):
- raise NotImplementedError
-
- def create_event(self):
- return Event()
+ @pytest.fixture(autouse=True)
+ def executor(self, scheduler):
+ dummy = DummyPoolExecutor()
+ executor = BasePoolExecutor(scheduler, dummy)
+ scheduler.add_executor(executor)
- def process_events(self):
- pass
-
- def finish(self, sched):
- pass
-
- def test_scheduler_implementation(self, scheduler):
- """Tests that starting the scheduler eventually calls _process_jobs()."""
-
- class TimeRoller(object):
- def __init__(self, start, step):
- self.now = start
- self.step = timedelta(seconds=step)
-
- def next(self):
- return self.now + self.step
+ @pytest.fixture
+ def start_scheduler(self, request, scheduler):
+ def cleanup():
+ if scheduler.running:
+ scheduler.shutdown()
- def __call__(self):
- now = self.now
- self.now = self.next()
- return now
+ request.addfinalizer(cleanup)
+ return scheduler.start
- events = []
- vals = [0]
- job_removed_event = self.create_event()
- shutdown_event = self.create_event()
- scheduler._threadpool = DummyThreadPool()
-
- # Test that pending jobs are added (and if due, executed) when the scheduler starts
- scheduler._current_time = time_roller = TimeRoller(dummy_datetime, 0.2)
- scheduler.add_listener(events.append)
- scheduler.add_listener(lambda e: job_removed_event.set(), EVENT_JOBSTORE_JOB_REMOVED)
- scheduler.add_job('date', increment, run_date=time_roller.next(), args=(vals,))
- scheduler.start()
- self.process_events()
- job_removed_event.wait(2)
- assert job_removed_event.is_set()
- assert vals[0] == 1
- assert len(events) == 5
- assert events[0].code == EVENT_JOBSTORE_ADDED
- assert events[1].code == EVENT_JOBSTORE_JOB_ADDED
- assert events[2].code == EVENT_SCHEDULER_START
- assert events[3].code == EVENT_JOB_EXECUTED
- assert events[4].code == EVENT_JOBSTORE_JOB_REMOVED
- del events[:]
- job_removed_event.clear()
-
- # Test that adding a job causes it to be executed after the specified delay
- job = scheduler.add_job('date', increment, run_date=time_roller.next() + time_roller.step * 2, args=(vals,))
- self.process_events()
- sleep(0.5)
- self.process_events()
- job_removed_event.wait(2)
- assert job_removed_event.is_set()
- assert vals[0] == 2
- assert len(events) == 3
- assert events[0].code == EVENT_JOBSTORE_JOB_ADDED
- assert events[1].code == EVENT_JOB_EXECUTED
- assert events[2].code == EVENT_JOBSTORE_JOB_REMOVED
- del events[:]
- job_removed_event.clear()
+ @pytest.fixture
+ def eventqueue(self, scheduler):
+ from six.moves.queue import Queue
+ events = Queue()
+ scheduler.add_listener(events.put)
+ return events
+
+ def wait_event(self, queue):
+ return queue.get(True, 1)
+
+ def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
+ """Tests that pending jobs are added (and if due, executed) when the scheduler starts."""
+
+ freeze_time.set_increment(timedelta(seconds=0.2))
+ scheduler.add_job('date', lambda x, y: x + y, args=[1, 2], run_date=freeze_time.next())
+ start_scheduler()
+
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_ADDED
+ assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START
+ event = self.wait_event(eventqueue)
+ assert event.code == EVENT_JOB_EXECUTED
+ assert event.retval == 3
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_REMOVED
+
+ def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
+ """Tests that adding a job causes it to be executed after the specified delay."""
+
+ freeze_time.set_increment(timedelta(seconds=0.2))
+ start_scheduler()
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
+ assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START
+
+ scheduler.add_job('date', lambda x, y: x + y, args=[1, 2],
+ run_date=freeze_time.next() + freeze_time.increment * 2)
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_ADDED
+ event = self.wait_event(eventqueue)
+ assert event.code == EVENT_JOB_EXECUTED
+ assert event.retval == 3
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_JOB_REMOVED
+
+ def test_shutdown(self, scheduler, eventqueue, start_scheduler):
+ """Tests that shutting down the scheduler emits the proper event."""
+
+ start_scheduler()
+ assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
+ assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_START
- # Test that shutting down the scheduler emits the proper event
- scheduler.add_listener(lambda e: shutdown_event.set(), EVENT_SCHEDULER_SHUTDOWN)
scheduler.shutdown()
- self.process_events()
- shutdown_event.wait(2)
- assert shutdown_event.is_set()
- assert len(events) == 1
- assert events[0].code == EVENT_SCHEDULER_SHUTDOWN
+ assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN
class TestBlockingScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
- sched = BlockingScheduler()
- self.thread = Thread(target=sched.start)
- sched.start = self.thread.start
- return sched
+ @pytest.fixture
+ def scheduler(self):
+ return BlockingScheduler()
- def finish(self, sched):
- self.thread.join()
+ @pytest.fixture
+ def start_scheduler(self, request, scheduler):
+ def cleanup():
+ if scheduler.running:
+ scheduler.shutdown()
+ thread.join()
+
+ request.addfinalizer(cleanup)
+ thread = Thread(target=scheduler.start)
+ return thread.start
class TestBackgroundScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture
+ def scheduler(self):
return BackgroundScheduler()
class TestAsyncIOScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture
+ def event_loop(self):
+ asyncio = pytest.importorskip('asyncio')
+ return asyncio.new_event_loop()
+
+ @pytest.fixture
+ def scheduler(self, event_loop):
asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
- sched = asyncio.AsyncIOScheduler()
- self.thread = Thread(target=sched._eventloop.run_forever)
- self.thread.start()
- return sched
+ return asyncio.AsyncIOScheduler(event_loop=event_loop)
+
+ @pytest.fixture
+ def start_scheduler(self, request, event_loop, scheduler):
+ def cleanup():
+ if scheduler.running:
+ event_loop.call_soon_threadsafe(scheduler.shutdown)
+ event_loop.call_soon_threadsafe(event_loop.stop)
+ thread.join()
- def finish(self, sched):
- sched._eventloop.call_soon_threadsafe(sched._eventloop.stop)
- self.thread.join()
+ event_loop.call_soon_threadsafe(scheduler.start)
+ request.addfinalizer(cleanup)
+ thread = Thread(target=event_loop.run_forever)
+ return thread.start
class TestGeventScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture
+ def scheduler(self):
gevent = pytest.importorskip('apscheduler.schedulers.gevent')
return gevent.GeventScheduler()
- def create_event(self):
+ @pytest.fixture
+ def calc_event(self):
from gevent.event import Event
return Event()
+ @pytest.fixture
+ def eventqueue(self, scheduler):
+ from gevent.queue import Queue
+ events = Queue()
+ scheduler.add_listener(events.put)
+ return events
+
class TestTornadoScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture
+ def io_loop(self):
+ ioloop = pytest.importorskip('tornado.ioloop')
+ return ioloop.IOLoop()
+
+ @pytest.fixture
+ def scheduler(self, io_loop):
tornado = pytest.importorskip('apscheduler.schedulers.tornado')
- sched = tornado.TornadoScheduler()
- self.thread = Thread(target=sched._ioloop.start)
- self.thread.start()
- return sched
+ return tornado.TornadoScheduler(io_loop=io_loop)
+
+ @pytest.fixture
+ def start_scheduler(self, request, io_loop, scheduler):
+ def cleanup():
+ if scheduler.running:
+ io_loop.add_callback(scheduler.shutdown)
+ io_loop.add_callback(io_loop.stop)
+ thread.join()
- def finish(self, sched):
- sched._ioloop.add_callback(sched._ioloop.stop)
- self.thread.join()
+ io_loop.add_callback(scheduler.start)
+ request.addfinalizer(cleanup)
+ thread = Thread(target=io_loop.start)
+ return thread.start
class TestTwistedScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture
+ def reactor(self):
+ selectreactor = pytest.importorskip('twisted.internet.selectreactor')
+ return selectreactor.SelectReactor()
+
+ @pytest.fixture
+ def scheduler(self, reactor):
twisted = pytest.importorskip('apscheduler.schedulers.twisted')
- sched = twisted.TwistedScheduler()
- self.thread = Thread(target=sched._reactor.run, args=(False,))
- self.thread.start()
- return sched
+ return twisted.TwistedScheduler(reactor=reactor)
+
+ @pytest.fixture
+ def start_scheduler(self, request, reactor, scheduler):
+ def cleanup():
+ if scheduler.running:
+ reactor.callFromThread(scheduler.shutdown)
+ reactor.callFromThread(reactor.stop)
+ thread.join()
- def finish(self, sched):
- sched._reactor.callFromThread(sched._reactor.stop)
- self.thread.join()
+ reactor.callFromThread(scheduler.start)
+ request.addfinalizer(cleanup)
+ thread = Thread(target=reactor.run, args=(False,))
+ return thread.start
+@pytest.mark.skip
class TestQtScheduler(SchedulerImplementationTestBase):
- def create_scheduler(self):
+ @pytest.fixture(scope='class')
+ def coreapp(self):
+ QtCore = pytest.importorskip('PySide.QtCore')
+ QtCore.QCoreApplication([])
+
+ @pytest.fixture
+ def scheduler(self, coreapp):
qt = pytest.importorskip('apscheduler.schedulers.qt')
- from PySide.QtCore import QCoreApplication
- QCoreApplication([])
return qt.QtScheduler()
- def process_events(self):
+ def wait_event(self, queue):
from PySide.QtCore import QCoreApplication
- QCoreApplication.processEvents()
+
+ while queue.empty():
+ QCoreApplication.processEvents()
+ return queue.get_nowait()
diff --git a/tests/test_threadpool.py b/tests/test_threadpool.py
deleted file mode 100644
index 1cbe44f..0000000
--- a/tests/test_threadpool.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from threading import Event
-from time import sleep
-
-import pytest
-
-from apscheduler.threadpool import ThreadPool
-
-
-def test_threadpool():
- pool = ThreadPool(core_threads=2, keepalive=0)
- event1 = Event()
- event2 = Event()
- event3 = Event()
- pool.submit(event1.set)
- pool.submit(event2.set)
- pool.submit(event3.set)
- event1.wait(1)
- event2.wait(1)
- event3.wait(1)
- assert event1.isSet()
- assert event2.isSet()
- assert event3.isSet()
- sleep(0.3)
- assert repr(pool) == '<ThreadPool at %x; threads=2/20>' % id(pool)
-
- pool.shutdown()
- assert repr(pool) == '<ThreadPool at %x; threads=0/20>' % id(pool)
-
- # Make sure double shutdown is ok
- pool.shutdown()
-
- # Make sure one can't submit tasks to a thread pool that has been shut down
- pytest.raises(RuntimeError, pool.submit, event1.set)
-
-
-def test_threadpool_maxthreads():
- pool = ThreadPool(core_threads=2, max_threads=1)
- assert pool.max_threads == 2
-
- pool = ThreadPool(core_threads=2, max_threads=3)
- assert pool.max_threads == 3
-
- pool = ThreadPool(core_threads=0, max_threads=0)
- assert pool.max_threads == 1
-
-
-def test_threadpool_nocore():
- pool = ThreadPool(keepalive=0)
- event = Event()
- pool.submit(event.set)
- event.wait(1)
- assert event.isSet()
- sleep(1)
- assert repr(pool) == '<ThreadPool at %x; threads=0/20>' % id(pool)
diff --git a/tests/test_util.py b/tests/test_util.py
index 437c151..03e3c34 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -142,41 +142,6 @@ def test_timedelta_seconds():
assert seconds == 150
-def test_time_difference_positive():
- earlier = datetime(2008, 9, 1, second=3)
- later = datetime(2008, 9, 1, second=49)
- assert time_difference(later, earlier) == 46
-
-
-def test_time_difference_negative():
- earlier = datetime(2009, 4, 7, second=7)
- later = datetime(2009, 4, 7, second=56)
- assert time_difference(earlier, later) == -49
-
-
-class TestDSTTimeDifference(object):
- @pytest.fixture(scope='class', autouse=True)
- def timezone(self, request):
- def finish():
- del os.environ['TZ']
- time.tzset()
-
- if hasattr(time, 'tzset'):
- os.environ['TZ'] = 'Europe/Helsinki'
- time.tzset()
- request.addfinalizer(finish)
-
- def test_time_difference_daylight_1(self):
- earlier = datetime(2010, 3, 28, 2)
- later = datetime(2010, 3, 28, 4)
- assert time_difference(later, earlier) == 3600
-
- def test_time_difference_daylight_2(self):
- earlier = datetime(2010, 10, 31, 2)
- later = datetime(2010, 10, 31, 5)
- assert time_difference(later, earlier) == 14400
-
-
def test_datetime_ceil_round():
dateval = datetime(2009, 4, 7, 2, 10, 16, 4000)
correct_answer = datetime(2009, 4, 7, 2, 10, 17)
diff --git a/tox.ini b/tox.ini
index 5350ae4..76e68ad 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,7 +6,6 @@ deps=pytest
pytest-cov
sqlalchemy
pymongo
- redis
[testenv]
deps={[base]deps}
@@ -14,7 +13,8 @@ deps={[base]deps}
twisted
gevent
tornado
-commands=py.test []
+ trollius
+commands=py.test -rsx
[testenv:py32]
deps={[testenv:py34]deps}
@@ -37,6 +37,7 @@ deps={[base]deps}
mock
tornado
twisted
+ trollius
[testenv:pep8]
deps=pep8