summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2018-10-27 16:45:18 +0700
committerSelwin Ong <selwin.ong@gmail.com>2018-10-27 16:45:18 +0700
commitd1377efdf28e726a0020e8775f35a1f8ca35b86b (patch)
treeb614c5471582e1e8d77e3bf1558856f0ce06d80b
parenteaf598d73ce234791ee22ef82f7f095aece9f2a6 (diff)
parent8bf3503cdd7f7c80746a31de3b28ed3dd5d63b49 (diff)
downloadrq-d1377efdf28e726a0020e8775f35a1f8ca35b86b.tar.gz
Merge branch 'failed-job-registry' into v1.0
-rw-r--r--.gitignore2
-rw-r--r--.pytest_cache/README.md8
-rw-r--r--CHANGES.md12
-rw-r--r--docs/docs/exceptions.md36
-rw-r--r--docs/docs/jobs.md53
-rw-r--r--docs/docs/workers.md17
-rw-r--r--rq/__init__.py2
-rwxr-xr-xrq/cli/cli.py39
-rw-r--r--rq/defaults.py1
-rw-r--r--rq/exceptions.py4
-rw-r--r--rq/handlers.py12
-rw-r--r--rq/job.py41
-rw-r--r--rq/queue.py83
-rw-r--r--rq/registry.py87
-rw-r--r--rq/worker.py94
-rw-r--r--tests/fixtures.py6
-rw-r--r--tests/test_cli.py125
-rw-r--r--tests/test_job.py90
-rw-r--r--tests/test_queue.py171
-rw-r--r--tests/test_registry.py170
-rw-r--r--tests/test_sentry.py9
-rw-r--r--tests/test_worker.py117
22 files changed, 695 insertions, 484 deletions
diff --git a/.gitignore b/.gitignore
index ab695b6..90ba653 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,10 +10,10 @@
/dist
/build
.tox
+.pytest_cache/
.vagrant
Vagrantfile
.idea/
.coverage*
/.cache
-.pytest_cache/
diff --git a/.pytest_cache/README.md b/.pytest_cache/README.md
new file mode 100644
index 0000000..bb78ba0
--- /dev/null
+++ b/.pytest_cache/README.md
@@ -0,0 +1,8 @@
+# pytest cache directory #
+
+This directory contains data from the pytest's cache plugin,
+which provides the `--lf` and `--ff` options, as well as the `cache` fixture.
+
+**Do not** commit this to version control.
+
+See [the docs](https://docs.pytest.org/en/latest/cache.html) for more information.
diff --git a/CHANGES.md b/CHANGES.md
index 080ec11..c3e783e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,15 @@
+### 1.0 (Not Yet Released)
+Backward incompatible changes:
+
+- `FailedQueue` has been replaced with `FailedJobRegistry`:
+ * `get_failed_queue()` function has been removed. Please use `FailedJobRegistry(queue=queue)` instead.
+ * `move_to_failed_queue()` has been removed.
+ * RQ now provides a mechanism to automatically cleanup failed jobs. By default, failed jobs are kept for 1 year.
+
+- RQ's custom job exception handling mechanism has also changed slightly:
+ * RQ's default exception handling mechanism (moving jobs to `FailedQueueRegistry`) can be disabled by doing `Worker(disable_default_exception_handler=True)`.
+ * Custom exception handlers are no longer executed in reverse order.
+
### 0.12.0 (2018-07-14)
- Added support for Python 3.7. Since `async` is a keyword in Python 3.7,
`Queue(async=False)` has been changed to `Queue(is_async=False)`. The `async`
diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md
index f0d217d..5c68632 100644
--- a/docs/docs/exceptions.md
+++ b/docs/docs/exceptions.md
@@ -6,27 +6,25 @@ layout: docs
Jobs can fail due to exceptions occurring. When your RQ workers run in the
background, how do you get notified of these exceptions?
-## Default: the `failed` queue
+## Default: the `FailedJobRegistry`
-The default safety net for RQ is the `failed` queue. Every job that fails
-execution is stored in here, along with its exception information (type,
+The default safety net for RQ is the `FailedJobRegistry`. Every job that doesn't
+execute successfully is stored here, along with its exception information (type,
value, traceback). While this makes sure no failing jobs "get lost", this is
of no use to get notified pro-actively about job failure.
-## Custom exception handlers
+## Custom Exception Handlers
-Starting from version 0.3.1, RQ supports registering custom exception
-handlers. This makes it possible to replace the default behaviour (sending
-the job to the `failed` queue) altogether, or to take additional steps when an
-exception occurs.
+RQ supports registering custom exception handlers. This makes it possible to
+inject your own error handling logic to your workers.
This is how you register custom exception handler(s) to an RQ worker:
{% highlight python %}
-from rq.handlers import move_to_failed_queue # RQ's default exception handler
+from exception_handlers import foo_handler, bar_handler
-w = Worker([q], exception_handlers=[my_handler, move_to_failed_queue])
+w = Worker([q], exception_handlers=[foo_handler, bar_handler])
...
{% endhighlight %}
@@ -48,12 +46,24 @@ def my_handler(job, *exc_info):
...
{% endhighlight %}
+Note that the custom exception handlers are executed in addition of RQ's default
+error handling mechanism (i.e. moving failed jobs to `FailedJobRegistry`). If
+you want to disable RQ's default error handling mechanism, instantiate RQ with
+`disable_default_exception_handler=True`:
-## Chaining exception handlers
+{% highlight python %}
+from exception_handlers import foo_handler
+
+w = Worker([q], exception_handlers=[foo_handler],
+ disable_default_exception_handler=True)
+{% endhighlight %}
+
+
+## Chaining Exception Handlers
The handler itself is responsible for deciding whether or not the exception
handling is done, or should fall through to the next handler on the stack.
-The handler can indicate this by returning a boolean. `False` means stop
+The handler can indicate this by returning a boolean. `False` means stop
processing exceptions, `True` means continue and fall through to the next
exception handler on the stack.
@@ -61,7 +71,7 @@ It's important to know for implementors that, by default, when the handler
doesn't have an explicit return value (thus `None`), this will be interpreted
as `True` (i.e. continue with the next handler).
-To replace the default behaviour (i.e. moving the job to the `failed` queue),
+To prevent the next exception handler in the handler chain from executing,
use a custom exception handler that doesn't fall through, for example:
{% highlight python %}
diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md
index ec829ce..bb84c54 100644
--- a/docs/docs/jobs.md
+++ b/docs/docs/jobs.md
@@ -65,31 +65,52 @@ job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)
## Failed Jobs
-If a job fails and raises an exception, the worker will put the job in a failed job queue.
-On the Job instance, the `is_failed` property will be true. To fetch all failed jobs, scan
-through the `get_failed_queue()` queue.
+If a job fails during execution, the worker will put the job in a FailedJobRegistry.
+On the Job instance, the `is_failed` property will be true. FailedJobRegistry
+can be accessed through `queue.failed_job_registry`.
{% highlight python %}
from redis import StrictRedis
-from rq import push_connection, get_failed_queue, Queue
+from rq import Queue
from rq.job import Job
-con = StrictRedis()
-push_connection(con)
-
def div_by_zero(x):
return x / 0
-job = Job.create(func=div_by_zero, args=(1, 2, 3))
-job.origin = 'fake'
-job.save()
-fq = get_failed_queue()
-fq.quarantine(job, Exception('Some fake error'))
-assert fq.count == 1
-fq.requeue(job.id)
+connection = StrictRedis()
+queue = Queue(connection=connection)
+job = queue.enqueue(div_by_zero, 1)
+registry = queue.failed_job_registry
+
+worker = Worker([queue])
+worker.work(burst=True)
+
+assert len(registry) == 1 # Failed jobs are kept in FailedJobRegistry
+
+registry.requeue(job) # Puts job back in its original queue
+
+assert len(registry) == 0
+
+assert queue.count == 1
+{% endhighlight %}
+
+By default, failed jobs are kept for 1 year. You can change this by specifying
+`failure_ttl` (in seconds) when enqueueing jobs.
+
+{% highlight python %}
+job = queue.enqueue(foo_job, failure_ttl=300) # 5 minutes in seconds
+{% endhighlight %}
+
+## Requeueing Failed Jobs
+
+RQ also provides a CLI tool that makes requeueing failed jobs easy.
+
+{% highlight console %}
+# This will requeue foo_job_id and bar_job_id from myqueue's failed job registry
+rq requeue --queue myqueue -u redis://localhost:6379 foo_job_id bar_job_id
-assert fq.count == 0
-assert Queue('fake').count == 1
+# This command will requeue all jobs in myqueue's failed job registry
+rq requeue --queue myqueue -u redis://localhost:6379 --all
{% endhighlight %}
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index e3d205e..f5ed79f 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -86,7 +86,7 @@ The life-cycle of a worker consists of a few phases:
7. _Cleanup job execution_. The worker sets its status to `idle` and sets both
the job and its result to expire based on `result_ttl`. Job is also removed
from `StartedJobRegistry` and added to to `FinishedJobRegistry` in the case
- of successful execution, or `FailedQueue` in the case of failure.
+ of successful execution, or `FailedJobRegistry` in the case of failure.
8. _Loop_. Repeat from step 3.
@@ -243,8 +243,6 @@ $ rq worker -c settings
## Custom worker classes
-_New in version 0.4.0._
-
There are times when you want to customize the worker's behavior. Some of the
more common requests so far are:
@@ -262,8 +260,6 @@ $ rq worker -w 'path.to.GeventWorker'
## Custom Job and Queue classes
-_Will be available in next release._
-
You can tell the worker to use a custom class for jobs and queues using
`--job-class` and/or `--queue-class`.
@@ -294,15 +290,14 @@ queue.enqueue(some_func)
When a Job times-out, the worker will try to kill it using the supplied
`death_penalty_class` (default: `UnixSignalDeathPenalty`). This can be overridden
-if you wish to attempt to kill jobs in an application specific or 'cleaner' manner.
+if you wish to attempt to kill jobs in an application specific or 'cleaner' manner.
DeathPenalty classes are constructed with the following arguments
`BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)`
-## Custom exception handlers
+## Custom Exception Handlers
-_New in version 0.5.5._
If you need to handle errors differently for different types of jobs, or simply want to customize
RQ's default error handling behavior, run `rq worker` using the `--exception-handler` option:
@@ -313,3 +308,9 @@ $ rq worker --exception-handler 'path.to.my.ErrorHandler'
# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
{% endhighlight %}
+
+If you want to disable RQ's default exception handler, use the `--disable-default-exception-handler` option:
+
+{% highlight console %}
+$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler
+{% endhighlight %}
diff --git a/rq/__init__.py b/rq/__init__.py
index 0e55f1b..9ad8be1 100644
--- a/rq/__init__.py
+++ b/rq/__init__.py
@@ -6,7 +6,7 @@ from __future__ import (absolute_import, division, print_function,
from .connections import (Connection, get_current_connection, pop_connection,
push_connection, use_connection)
from .job import cancel_job, get_current_job, requeue_job
-from .queue import get_failed_queue, Queue
+from .queue import Queue
from .version import VERSION
from .worker import SimpleWorker, Worker
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 81ef810..70ea182 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -12,7 +12,7 @@ import sys
import click
from redis.exceptions import ConnectionError
-from rq import Connection, get_failed_queue, __version__ as version
+from rq import Connection, __version__ as version
from rq.cli.helpers import (read_config_file, refresh,
setup_loghandlers_from_args,
show_both, show_queues, show_workers, CliConfig)
@@ -23,6 +23,7 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from rq.exceptions import InvalidJobOperationError
+from rq.registry import FailedJobRegistry
from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
@@ -112,16 +113,16 @@ def empty(cli_config, all, queues, **options):
@main.command()
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
+@click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1)
@pass_cli_config
-def requeue(cli_config, all, job_class, job_ids, **options):
+def requeue(cli_config, queue, all, job_class, job_ids, **options):
"""Requeue failed jobs."""
- failed_queue = get_failed_queue(connection=cli_config.connection,
- job_class=cli_config.job_class)
-
+ failed_job_registry = FailedJobRegistry(queue,
+ connection=cli_config.connection)
if all:
- job_ids = failed_queue.job_ids
+ job_ids = failed_job_registry.get_job_ids()
if not job_ids:
click.echo('Nothing to do')
@@ -132,12 +133,12 @@ def requeue(cli_config, all, job_class, job_ids, **options):
with click.progressbar(job_ids) as job_ids:
for job_id in job_ids:
try:
- failed_queue.requeue(job_id)
+ failed_job_registry.requeue(job_id)
except InvalidJobOperationError:
fail_count += 1
if fail_count > 0:
- click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red')
+ click.secho('Unable to requeue {0} jobs from failed job registry'.format(fail_count), fg='red')
@main.command()
@@ -185,13 +186,14 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path')
+@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, verbose, quiet, sentry_dsn,
- exception_handler, pid, queues, log_format, date_format, **options):
+ exception_handler, pid, disable_default_exception_handler, queues,
+ log_format, date_format, **options):
"""Starts an RQ worker."""
-
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
queues = queues or settings.get('QUEUES', ['default'])
@@ -219,15 +221,14 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
connection=cli_config.connection,
job_class=cli_config.job_class)
for queue in queues]
- worker = cli_config.worker_class(queues,
- name=name,
- connection=cli_config.connection,
- default_worker_ttl=worker_ttl,
- default_result_ttl=results_ttl,
- job_monitoring_interval=job_monitoring_interval,
- job_class=cli_config.job_class,
- queue_class=cli_config.queue_class,
- exception_handlers=exception_handlers or None)
+ worker = cli_config.worker_class(
+ queues, name=name, connection=cli_config.connection,
+ default_worker_ttl=worker_ttl, default_result_ttl=results_ttl,
+ job_monitoring_interval=job_monitoring_interval,
+ job_class=cli_config.job_class, queue_class=cli_config.queue_class,
+ exception_handlers=exception_handlers or None,
+ disable_default_exception_handler=disable_default_exception_handler
+ )
# Should we configure Sentry?
if sentry_dsn:
diff --git a/rq/defaults.py b/rq/defaults.py
index 517144c..6b31a16 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -5,5 +5,6 @@ DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis'
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500
+DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
diff --git a/rq/exceptions.py b/rq/exceptions.py
index e9f58e0..684bfb0 100644
--- a/rq/exceptions.py
+++ b/rq/exceptions.py
@@ -15,6 +15,10 @@ class InvalidJobOperationError(Exception):
pass
+class InvalidJobOperation(Exception):
+ pass
+
+
class UnpickleError(Exception):
def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message, inner_exception)
diff --git a/rq/handlers.py b/rq/handlers.py
deleted file mode 100644
index 33ac03d..0000000
--- a/rq/handlers.py
+++ /dev/null
@@ -1,12 +0,0 @@
-import traceback
-
-from .connections import get_current_connection
-from .queue import get_failed_queue
-from .worker import Worker
-
-
-def move_to_failed_queue(job, *exc_info):
- """Default exception handler: move the job to the failed queue."""
- exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info))
- failed_queue = get_failed_queue(get_current_connection(), job.__class__)
- failed_queue.quarantine(job, exc_info=exc_string)
diff --git a/rq/job.py b/rq/job.py
index 9432813..0741d13 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -20,6 +20,7 @@ try:
except ImportError: # noqa # pragma: no cover
import pickle
+
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
@@ -62,16 +63,6 @@ def cancel_job(job_id, connection=None):
Job.fetch(job_id, connection=connection).cancel()
-def requeue_job(job_id, connection=None, job_class=None):
- """Requeues the job with the given job ID. If no such job exists, just
- remove the job ID from the failed queue, otherwise the job ID should refer
- to a failed job (i.e. it should be on the failed queue).
- """
- from .queue import get_failed_queue
- failed_queue = get_failed_queue(connection=connection, job_class=job_class)
- return failed_queue.requeue(job_id)
-
-
def get_current_job(connection=None, job_class=None):
"""Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context, None is returned.
@@ -82,6 +73,11 @@ def get_current_job(connection=None, job_class=None):
return _job_stack.top
+def requeue_job(job_id, connection):
+ job = Job.fetch(job_id, connection=connection)
+ return job.requeue()
+
+
class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data.
"""
@@ -91,7 +87,8 @@ class Job(object):
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, ttl=None, status=None, description=None,
- depends_on=None, timeout=None, id=None, origin=None, meta=None):
+ depends_on=None, timeout=None, id=None, origin=None, meta=None,
+ failure_ttl=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@@ -132,6 +129,7 @@ class Job(object):
# Extra meta data
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
+ job.failure_ttl = failure_ttl
job.ttl = ttl
job.timeout = parse_timeout(timeout)
job._status = status
@@ -319,6 +317,7 @@ class Job(object):
self.exc_info = None
self.timeout = None
self.result_ttl = None
+ self.failure_ttl = None
self.ttl = None
self._status = None
self._dependency_id = None
@@ -443,6 +442,7 @@ class Job(object):
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
+ self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
@@ -456,7 +456,6 @@ class Job(object):
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)
-
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
@@ -489,6 +488,8 @@ class Job(object):
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
+ if self.failure_ttl is not None:
+ obj['failure_ttl'] = self.failure_ttl
if self._status is not None:
obj['status'] = self._status
if self._dependency_id is not None:
@@ -535,11 +536,14 @@ class Job(object):
q.remove(self, pipeline=pipeline)
pipeline.execute()
+ def requeue(self):
+ """Requeues job."""
+ self.failed_job_registry.requeue(self)
+
def delete(self, pipeline=None, remove_from_queue=True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
-
if remove_from_queue:
self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
@@ -566,10 +570,7 @@ class Job(object):
registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.FAILED:
- from .queue import get_failed_queue
- failed_queue = get_failed_queue(connection=self.connection,
- job_class=self.__class__)
- failed_queue.remove(self, pipeline=pipeline)
+ self.failed_job_registry.remove(self, pipeline=pipeline)
if delete_dependents:
self.delete_dependents(pipeline=pipeline)
@@ -652,6 +653,12 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl)
+ @property
+ def failed_job_registry(self):
+ from .registry import FailedJobRegistry
+ return FailedJobRegistry(self.origin, connection=self.connection,
+ job_class=self.__class__)
+
def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they
depend on is successfully performed. We record this relation as
diff --git a/rq/queue.py b/rq/queue.py
index 3aea012..d2fae1a 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -16,11 +16,6 @@ from .job import Job, JobStatus
from .utils import backend_class, import_attribute, utcnow, parse_timeout
-def get_failed_queue(connection=None, job_class=None):
- """Returns a handle to the special failed queue."""
- return FailedQueue(connection=connection, job_class=job_class)
-
-
def compact(lst):
return [item for item in lst if item is not None]
@@ -141,8 +136,7 @@ class Queue(object):
except NoSuchJobError:
self.remove(job_id)
else:
- if job.origin == self.name or \
- (job.is_failed and self == get_failed_queue(connection=self.connection, job_class=self.job_class)):
+ if job.origin == self.name:
return job
def get_job_ids(self, offset=0, length=-1):
@@ -175,6 +169,12 @@ class Queue(object):
"""Returns a count of all messages in the queue."""
return self.connection.llen(self.key)
+ @property
+ def failed_job_registry(self):
+ """Returns this queue's FailedJobRegistry."""
+ from rq.registry import FailedJobRegistry
+ return FailedJobRegistry(queue=self)
+
def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
@@ -210,8 +210,9 @@ class Queue(object):
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
- result_ttl=None, ttl=None, description=None,
- depends_on=None, job_id=None, at_front=False, meta=None):
+ result_ttl=None, ttl=None, failure_ttl=None,
+ description=None, depends_on=None, job_id=None,
+ at_front=False, meta=None):
"""Creates a job to represent the delayed function call and enqueues
it.
@@ -221,13 +222,15 @@ class Queue(object):
"""
timeout = parse_timeout(timeout) or self._default_timeout
result_ttl = parse_timeout(result_ttl)
+ failure_ttl = parse_timeout(failure_ttl)
ttl = parse_timeout(ttl)
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
- result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED,
- description=description, depends_on=depends_on,
- timeout=timeout, id=job_id, origin=self.name, meta=meta)
+ result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
+ status=JobStatus.QUEUED, description=description,
+ depends_on=depends_on, timeout=timeout, id=job_id,
+ origin=self.name, meta=meta)
# If job depends on an unfinished job, register itself on it's
# parent's dependents instead of enqueueing it.
@@ -295,6 +298,7 @@ class Queue(object):
description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
+ failure_ttl = kwargs.pop('failure_ttl', None)
depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
at_front = kwargs.pop('at_front', False)
@@ -305,10 +309,12 @@ class Queue(object):
args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)
- return self.enqueue_call(func=f, args=args, kwargs=kwargs,
- timeout=timeout, result_ttl=result_ttl, ttl=ttl,
- description=description, depends_on=depends_on,
- job_id=job_id, at_front=at_front, meta=meta)
+ return self.enqueue_call(
+ func=f, args=args, kwargs=kwargs, timeout=timeout,
+ result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
+ description=description, depends_on=depends_on, job_id=job_id,
+ at_front=at_front, meta=meta
+ )
def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution.
@@ -504,48 +510,3 @@ class Queue(object):
def __str__(self):
return '<{0} {1}>'.format(self.__class__.__name__, self.name)
-
-
-class FailedQueue(Queue):
- def __init__(self, connection=None, job_class=None):
- super(FailedQueue, self).__init__(JobStatus.FAILED,
- connection=connection,
- job_class=job_class)
-
- def quarantine(self, job, exc_info):
- """Puts the given Job in quarantine (i.e. put it on the failed
- queue).
- """
-
- with self.connection._pipeline() as pipeline:
- # Add Queue key set
- self.connection.sadd(self.redis_queues_keys, self.key)
-
- job.exc_info = exc_info
- job.save(pipeline=pipeline, include_meta=False)
- job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
-
- self.push_job_id(job.id, pipeline=pipeline)
- pipeline.execute()
-
- return job
-
- def requeue(self, job_id):
- """Requeues the job with the given job ID."""
- try:
- job = self.job_class.fetch(job_id, connection=self.connection)
- except NoSuchJobError:
- # Silently ignore/remove this job and return (i.e. do nothing)
- self.remove(job_id)
- return
-
- # Delete it from the failed queue (raise an error if that failed)
- if self.remove(job) == 0:
- raise InvalidJobOperationError('Cannot requeue non-failed jobs')
-
- job.set_status(JobStatus.QUEUED)
- job.exc_info = None
- queue = Queue(job.origin,
- connection=self.connection,
- job_class=self.job_class)
- return queue.enqueue_job(job)
diff --git a/rq/registry.py b/rq/registry.py
index 2b8b992..5cfdd43 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -1,8 +1,9 @@
from .compat import as_text
from .connections import resolve_connection
-from .exceptions import NoSuchJobError
+from .defaults import DEFAULT_FAILURE_TTL
+from .exceptions import InvalidJobOperation, NoSuchJobError
from .job import Job, JobStatus
-from .queue import FailedQueue
+from .queue import Queue
from .utils import backend_class, current_timestamp
@@ -27,11 +28,20 @@ class BaseRegistry(object):
self.key = self.key_template.format(self.name)
self.job_class = backend_class(self, 'job_class', override=job_class)
-
def __len__(self):
"""Returns the number of jobs in this registry"""
return self.count
+ def __contains__(self, item):
+ """
+ Returns a boolean indicating registry contains the given
+ job instance or job id.
+ """
+ job_id = item
+ if isinstance(item, self.job_class):
+ job_id = item.id
+ return self.connection.zscore(self.key, job_id) is not None
+
@property
def count(self):
"""Returns the number of jobs in this registry"""
@@ -69,6 +79,10 @@ class BaseRegistry(object):
return [as_text(job_id) for job_id in
self.connection.zrange(self.key, start, end)]
+ def get_queue(self):
+ """Returns Queue object associated with this registry."""
+ return Queue(self.name, connection=self.connection)
+
class StartedJobRegistry(BaseRegistry):
"""
@@ -82,7 +96,7 @@ class StartedJobRegistry(BaseRegistry):
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp=None):
- """Remove expired jobs from registry and add them to FailedQueue.
+ """Remove expired jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
@@ -92,8 +106,7 @@ class StartedJobRegistry(BaseRegistry):
job_ids = self.get_expired_job_ids(score)
if job_ids:
- failed_queue = FailedQueue(connection=self.connection,
- job_class=self.job_class)
+ failed_job_registry = FailedJobRegistry(self.name, self.connection)
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
@@ -103,7 +116,7 @@ class StartedJobRegistry(BaseRegistry):
job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
- failed_queue.push_job_id(job_id, pipeline=pipeline)
+ failed_job_registry.add(job, job.failure_ttl)
except NoSuchJobError:
pass
@@ -131,6 +144,61 @@ class FinishedJobRegistry(BaseRegistry):
self.connection.zremrangebyscore(self.key, 0, score)
+class FailedJobRegistry(BaseRegistry):
+ """
+ Registry of containing failed jobs.
+ """
+ key_template = 'rq:failed:{0}'
+
+ def cleanup(self, timestamp=None):
+ """Remove expired jobs from registry.
+
+ Removes jobs with an expiry time earlier than timestamp, specified as
+ seconds since the Unix epoch. timestamp defaults to call time if
+ unspecified.
+ """
+ score = timestamp if timestamp is not None else current_timestamp()
+ self.connection.zremrangebyscore(self.key, 0, score)
+
+ def add(self, job, ttl=None, exc_string='', pipeline=None):
+ """
+ Adds a job to a registry with expiry time of now + ttl.
+ `ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
+ """
+ if ttl is None:
+ ttl = DEFAULT_FAILURE_TTL
+ score = ttl if ttl < 0 else current_timestamp() + ttl
+
+ if pipeline:
+ p = pipeline
+ else:
+ p = self.connection._pipeline()
+
+ job.exc_info = exc_string
+ job.save(pipeline=p, include_meta=False)
+ job.cleanup(ttl=-1, pipeline=p) # failed job won't expire
+ p.zadd(self.key, score, job.id)
+
+ if not pipeline:
+ p.execute()
+
+ def requeue(self, job_or_id):
+ """Requeues the job with the given job ID."""
+ if isinstance(job_or_id, self.job_class):
+ job = job_or_id
+ else:
+ job = self.job_class.fetch(job_or_id, connection=self.connection)
+
+ result = self.connection.zrem(self.key, job.id)
+ if not result:
+ raise InvalidJobOperation
+
+ queue = Queue(job.origin, connection=self.connection,
+ job_class=self.job_class)
+
+ return queue.enqueue_job(job)
+
+
class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
@@ -154,3 +222,8 @@ def clean_registries(queue):
connection=queue.connection,
job_class=queue.job_class)
registry.cleanup()
+
+ registry = FailedJobRegistry(name=queue.name,
+ connection=queue.connection,
+ job_class=queue.job_class)
+ registry.cleanup()
diff --git a/rq/worker.py b/rq/worker.py
index cdfa33f..b018519 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -24,13 +24,16 @@ from redis import WatchError
from . import worker_registration
from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
-from .defaults import (DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
+
+from .defaults import (DEFAULT_FAILURE_TTL, DEFAULT_RESULT_TTL,
+ DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
-from .queue import Queue, get_failed_queue
-from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries
+from .queue import Queue
+from .registry import (FailedJobRegistry, FinishedJobRegistry,
+ StartedJobRegistry, clean_registries)
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum,
@@ -159,7 +162,7 @@ class Worker(object):
def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL,
connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None,
- queue_class=None,
+ queue_class=None, disable_default_exception_handler=False,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL): # noqa
if connection is None:
connection = get_current_connection()
@@ -187,27 +190,17 @@ class Worker(object):
self._horse_pid = 0
self._stop_requested = False
self.log = logger
- self.failed_queue = get_failed_queue(connection=self.connection,
- job_class=self.job_class)
self.last_cleaned_at = None
self.successful_job_count = 0
self.failed_job_count = 0
self.total_working_time = 0
self.birth_date = None
- # By default, push the "move-to-failed-queue" exception handler onto
- # the stack
- if exception_handlers is None:
- self.push_exc_handler(self.move_to_failed_queue)
- if exc_handler is not None:
- self.push_exc_handler(exc_handler)
- warnings.warn(
- "exc_handler is deprecated, pass a list to exception_handlers instead.",
- DeprecationWarning
- )
- elif isinstance(exception_handlers, list):
- for h in exception_handlers:
- self.push_exc_handler(h)
+ self.disable_default_exception_handler = disable_default_exception_handler
+
+ if isinstance(exception_handlers, list):
+ for handler in exception_handlers:
+ self.push_exc_handler(handler)
elif exception_handlers is not None:
self.push_exc_handler(exception_handlers)
@@ -638,19 +631,17 @@ class Worker(object):
if not job.ended_at:
job.ended_at = utcnow()
- self.handle_job_failure(job=job)
-
# Unhandled failure: move the job to the failed queue
self.log.warning((
- 'Moving job to {0!r} queue '
- '(work-horse terminated unexpectedly; waitpid returned {1})'
- ).format(self.failed_queue.name, ret_val))
- self.failed_queue.quarantine(
+ 'Moving job to FailedJobRegistry '
+ '(work-horse terminated unexpectedly; waitpid returned {})'
+ ).format(ret_val))
+
+ exc_string = "Work-horse process was terminated unexpectedly " + "(waitpid returned %s)" % ret_val
+ self.handle_job_failure(
job,
- exc_info=(
- "Work-horse process was terminated unexpectedly "
- "(waitpid returned {0})"
- ).format(ret_val)
+ exc_string="Work-horse process was terminated unexpectedly "
+ "(waitpid returned %s)" % ret_val
)
def execute_job(self, job, queue):
@@ -717,24 +708,37 @@ class Worker(object):
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
- def handle_job_failure(self, job, started_job_registry=None):
+ def handle_job_failure(self, job, started_job_registry=None,
+ exc_string=''):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
- 2. Removing the job from the started_job_registry
+ 2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None
+ 4. Add the job to FailedJobRegistry
"""
with self.connection._pipeline() as pipeline:
if started_job_registry is None:
- started_job_registry = StartedJobRegistry(job.origin,
- self.connection,
- job_class=self.job_class)
+ started_job_registry = StartedJobRegistry(
+ job.origin,
+ self.connection,
+ job_class=self.job_class
+ )
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
+
+ if not self.disable_default_exception_handler:
+ failed_job_registry = FailedJobRegistry(job.origin, job.connection,
+ job_class=self.job_class)
+ failed_job_registry.add(job, ttl=job.failure_ttl,
+ exc_string=exc_string, pipeline=pipeline)
+
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
- self.increment_total_working_time(job.ended_at - job.started_at,
- pipeline)
+ self.increment_total_working_time(
+ job.ended_at - job.started_at,
+ pipeline
+ )
try:
pipeline.execute()
@@ -785,7 +789,6 @@ class Worker(object):
inside the work horse's process.
"""
self.prepare_job_execution(job, heartbeat_ttl)
-
push_connection(self.connection)
started_job_registry = StartedJobRegistry(job.origin,
@@ -803,15 +806,18 @@ class Worker(object):
# Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails
job._result = rv
-
self.handle_job_success(job=job,
queue=queue,
started_job_registry=started_job_registry)
except:
job.ended_at = utcnow()
- self.handle_job_failure(job=job,
+ exc_info = sys.exc_info()
+ exc_string = self._get_safe_exception_string(
+ traceback.format_exception(*exc_info)
+ )
+ self.handle_job_failure(job=job, exc_string=exc_string,
started_job_registry=started_job_registry)
- self.handle_exception(job, *sys.exc_info())
+ self.handle_exception(job, *exc_info)
return False
finally:
@@ -845,7 +851,7 @@ class Worker(object):
'queue': job.origin,
})
- for handler in reversed(self._exc_handlers):
+ for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
fallthrough = handler(job, *exc_info)
@@ -857,12 +863,6 @@ class Worker(object):
if not fallthrough:
break
- def move_to_failed_queue(self, job, *exc_info):
- """Default exception handler: move the job to the failed queue."""
- self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
- from .handlers import move_to_failed_queue
- move_to_failed_queue(job, *exc_info)
-
@staticmethod
def _get_safe_exception_string(exc_strings):
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 16c2134..882bdad 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -119,6 +119,12 @@ def black_hole(job, *exc_info):
return False
+def add_meta(job, *exc_info):
+ job.meta = {'foo': 1}
+ job.save()
+ return True
+
+
def save_key_ttl(key):
# Stores key ttl in meta
job = get_current_job()
diff --git a/tests/test_cli.py b/tests/test_cli.py
index c306c81..198ec2f 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -3,15 +3,21 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
from click.testing import CliRunner
-from rq import get_failed_queue, Queue
+from redis import Redis
+
+from rq import Queue
from rq.compat import is_python_version
-from rq.job import Job
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
+from rq.job import Job
+from rq.queue import Queue
+from rq.registry import FailedJobRegistry
+from rq.worker import Worker
+
import pytest
from tests import RQTestCase
-from tests.fixtures import div_by_zero
+from tests.fixtures import add_meta, div_by_zero, say_hello
if is_python_version((2, 7), (3, 2)):
from unittest import TestCase
@@ -40,11 +46,11 @@ class TestRQCli(RQTestCase):
super(TestRQCli, self).setUp()
db_num = self.testconn.connection_pool.connection_kwargs['db']
self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
+ self.connection = Redis.from_url(self.redis_url)
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
def test_config_file(self):
settings = read_config_file('tests.dummy_settings')
@@ -111,36 +117,57 @@ class TestRQCli(RQTestCase):
self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Nothing to do')
- def test_empty_failed(self):
- """rq empty -u <url> failed"""
- runner = CliRunner()
- result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
- self.assert_normal_execution(result)
- self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
-
- def test_empty_all(self):
- """rq empty -u <url> failed --all"""
- runner = CliRunner()
- result = runner.invoke(main, ['empty', '-u', self.redis_url, '--all'])
- self.assert_normal_execution(result)
- self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
-
def test_requeue(self):
"""rq requeue -u <url> --all"""
+ connection = Redis.from_url(self.redis_url)
+ queue = Queue('requeue', connection=connection)
+ registry = queue.failed_job_registry
+
runner = CliRunner()
- result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
+
+ job = queue.enqueue(div_by_zero)
+ job2 = queue.enqueue(div_by_zero)
+ job3 = queue.enqueue(div_by_zero)
+
+ worker = Worker([queue])
+ worker.work(burst=True)
+
+ self.assertIn(job, registry)
+ self.assertIn(job2, registry)
+ self.assertIn(job3, registry)
+
+ result = runner.invoke(
+ main,
+ ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id]
+ )
self.assert_normal_execution(result)
- self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
- result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
+ # Only the first specified job is requeued
+ self.assertNotIn(job, registry)
+ self.assertIn(job2, registry)
+ self.assertIn(job3, registry)
+
+ result = runner.invoke(
+ main,
+ ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all']
+ )
self.assert_normal_execution(result)
- self.assertEqual(result.output.strip(), 'Nothing to do')
+ # With --all flag, all failed jobs are requeued
+ self.assertNotIn(job2, registry)
+ self.assertNotIn(job3, registry)
def test_info(self):
"""rq info -u <url>"""
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assert_normal_execution(result)
+ self.assertIn('0 queues, 0 jobs total', result.output)
+
+ queue = Queue(connection=self.connection)
+ queue.enqueue(say_hello)
+
+ result = runner.invoke(main, ['info', '-u', self.redis_url])
+ self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output)
def test_info_only_queues(self):
@@ -148,6 +175,13 @@ class TestRQCli(RQTestCase):
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-queues'])
self.assert_normal_execution(result)
+ self.assertIn('0 queues, 0 jobs total', result.output)
+
+ queue = Queue(connection=self.connection)
+ queue.enqueue(say_hello)
+
+ result = runner.invoke(main, ['info', '-u', self.redis_url])
+ self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output)
def test_info_only_workers(self):
@@ -155,6 +189,12 @@ class TestRQCli(RQTestCase):
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
+ self.assertIn('0 workers, 0 queue', result.output)
+
+ queue = Queue(connection=self.connection)
+ queue.enqueue(say_hello)
+ result = runner.invoke(main, ['info', '-u', self.redis_url, '--only-workers'])
+ self.assert_normal_execution(result)
self.assertIn('0 workers, 1 queues', result.output)
def test_worker(self):
@@ -173,22 +213,41 @@ class TestRQCli(RQTestCase):
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
- q = Queue()
- failed_q = get_failed_queue()
- failed_q.empty()
-
+ connection = Redis.from_url(self.redis_url)
+ q = Queue('default', connection=connection)
runner = CliRunner()
- # If exception handler is not given, failed job goes to FailedQueue
- q.enqueue(div_by_zero)
+ # If exception handler is not given, no custom exception handler is run
+ job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
- self.assertEqual(failed_q.count, 1)
+ registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in registry)
+
+ # If disable-default-exception-handler is given, job is not moved to FailedJobRegistry
+ job = q.enqueue(div_by_zero)
+ runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
+ '--disable-default-exception-handler'])
+ registry = FailedJobRegistry(queue=q)
+ self.assertFalse(job in registry)
- # Black hole exception handler doesn't add failed jobs to FailedQueue
- q.enqueue(div_by_zero)
+ # Both default and custom exception handler is run
+ job = q.enqueue(div_by_zero)
+ runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
+ '--exception-handler', 'tests.fixtures.add_meta'])
+ registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in registry)
+ job.refresh()
+ self.assertEqual(job.meta, {'foo': 1})
+
+ # Only custom exception handler is run
+ job = q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
- '--exception-handler', 'tests.fixtures.black_hole'])
- self.assertEqual(failed_q.count, 1)
+ '--exception-handler', 'tests.fixtures.add_meta',
+ '--disable-default-exception-handler'])
+ registry = FailedJobRegistry(queue=q)
+ self.assertFalse(job in registry)
+ job.refresh()
+ self.assertEqual(job.meta, {'foo': 1})
def test_suspend_and_resume(self):
"""rq suspend -u <url>
diff --git a/tests/test_job.py b/tests/test_job.py
index d940f6d..4e34574 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -18,8 +18,10 @@ from tests import fixtures, RQTestCase
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
-from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job
-from rq.queue import Queue, get_failed_queue
+from rq.job import Job, get_current_job, JobStatus, cancel_job
+from rq.queue import Queue
+from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
+ FinishedJobRegistry, StartedJobRegistry)
from rq.utils import utcformat
from rq.worker import Worker
@@ -360,6 +362,18 @@ class TestJob(RQTestCase):
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)
+ def test_failure_ttl_is_persisted(self):
+ """Ensure job.failure_ttl is set and restored properly"""
+ job = Job.create(func=fixtures.say_hello, args=('Lionel',), failure_ttl=15)
+ job.save()
+ Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.failure_ttl, 15)
+
+ job = Job.create(func=fixtures.say_hello, args=('Lionel',))
+ job.save()
+ Job.fetch(job.id, connection=self.testconn)
+ self.assertEqual(job.failure_ttl, None)
+
def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!')
@@ -383,10 +397,11 @@ class TestJob(RQTestCase):
def test_job_access_within_job_function(self):
"""The current job is accessible within the job function."""
q = Queue()
- q.enqueue(fixtures.access_self) # access_self calls get_current_job() and asserts
+ job = q.enqueue(fixtures.access_self)
w = Worker([q])
w.work(burst=True)
- assert get_failed_queue(self.testconn).count == 0
+ # access_self calls get_current_job() and executes successfully
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_job_access_within_synchronous_job_function(self):
queue = Queue(is_async=False)
@@ -483,6 +498,48 @@ class TestJob(RQTestCase):
self.assertNotIn(job.id, queue.get_job_ids())
+ def test_job_delete_removes_itself_from_registries(self):
+ """job.delete() should remove itself from job registries"""
+ connection = self.testconn
+ job = Job.create(func=fixtures.say_hello, status=JobStatus.FAILED,
+ connection=self.testconn, origin='default')
+ job.save()
+ registry = FailedJobRegistry(connection=self.testconn)
+ registry.add(job, 500)
+
+ job.delete()
+ self.assertFalse(job in registry)
+
+ job = Job.create(func=fixtures.say_hello, status=JobStatus.FINISHED,
+ connection=self.testconn, origin='default')
+ job.save()
+
+ registry = FinishedJobRegistry(connection=self.testconn)
+ registry.add(job, 500)
+
+ job.delete()
+ self.assertFalse(job in registry)
+
+ job = Job.create(func=fixtures.say_hello, status=JobStatus.STARTED,
+ connection=self.testconn, origin='default')
+ job.save()
+
+ registry = StartedJobRegistry(connection=self.testconn)
+ registry.add(job, 500)
+
+ job.delete()
+ self.assertFalse(job in registry)
+
+ job = Job.create(func=fixtures.say_hello, status=JobStatus.DEFERRED,
+ connection=self.testconn, origin='default')
+ job.save()
+
+ registry = DeferredJobRegistry(connection=self.testconn)
+ registry.add(job, 500)
+
+ job.delete()
+ self.assertFalse(job in registry)
+
def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis."""
@@ -574,31 +631,6 @@ class TestJob(RQTestCase):
cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs()))
- def test_create_failed_and_cancel_job(self):
- """test creating and using cancel_job deletes job properly"""
- failed_queue = get_failed_queue(connection=self.testconn)
- job = failed_queue.enqueue(fixtures.say_hello)
- job.set_status(JobStatus.FAILED)
- self.assertEqual(1, len(failed_queue.get_jobs()))
- cancel_job(job.id)
- self.assertEqual(0, len(failed_queue.get_jobs()))
-
- def test_create_and_requeue_job(self):
- """Requeueing existing jobs."""
- job = Job.create(func=fixtures.div_by_zero, args=(1, 2, 3))
- job.origin = 'fake'
- job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
-
- self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
- self.assertEqual(get_failed_queue().count, 1)
-
- requeued_job = requeue_job(job.id)
-
- self.assertEqual(get_failed_queue().count, 0)
- self.assertEqual(Queue('fake').count, 1)
- self.assertEqual(requeued_job.origin, job.origin)
-
def test_dependents_key_for_should_return_prefixed_job_id(self):
"""test redis key to store job dependents hash under"""
job_id = 'random'
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 2407c28..0434c62 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -3,11 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
from tests import RQTestCase
-from tests.fixtures import (div_by_zero, echo, Number, say_hello,
- some_calculation)
+from tests.fixtures import echo, Number, say_hello
-from rq import get_failed_queue, Queue
-from rq.exceptions import InvalidJobDependency, InvalidJobOperationError
+from rq import Queue
+from rq.exceptions import InvalidJobDependency
from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry
from rq.worker import Worker
@@ -319,6 +318,13 @@ class TestQueue(RQTestCase):
self.assertEqual(job.meta['foo'], 'bar')
self.assertEqual(job.meta['baz'], 42)
+ def test_enqueue_with_failure_ttl(self):
+ """enqueue() properly sets job.failure_ttl"""
+ q = Queue()
+ job = q.enqueue(say_hello, failure_ttl=10)
+ job.refresh()
+ self.assertEqual(job.failure_ttl, 10)
+
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
q = Queue()
@@ -545,160 +551,3 @@ class TestQueue(RQTestCase):
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
-
-
-class TestFailedQueue(RQTestCase):
- def test_get_failed_queue(self):
- """Use custom job class"""
- class CustomJob(Job):
- pass
- failed_queue = get_failed_queue(job_class=CustomJob)
- self.assertIs(failed_queue.job_class, CustomJob)
-
- failed_queue = get_failed_queue(job_class='rq.job.Job')
- self.assertIsNot(failed_queue.job_class, CustomJob)
-
- def test_requeue_job(self):
- """Requeueing existing jobs."""
- job = Job.create(func=div_by_zero, args=(1, 2, 3))
- job.origin = 'fake'
- job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
-
- self.assertEqual(Queue.all(), [get_failed_queue()]) # noqa
- self.assertEqual(get_failed_queue().count, 1)
-
- requeued_job = get_failed_queue().requeue(job.id)
-
- self.assertEqual(get_failed_queue().count, 0)
- self.assertEqual(Queue('fake').count, 1)
- self.assertEqual(requeued_job.origin, job.origin)
-
- def test_get_job_on_failed_queue(self):
- default_queue = Queue()
- failed_queue = get_failed_queue()
-
- job = default_queue.enqueue(div_by_zero, args=(1, 2, 3))
-
- job_on_default_queue = default_queue.fetch_job(job.id)
- job_on_failed_queue = failed_queue.fetch_job(job.id)
-
- self.assertIsNotNone(job_on_default_queue)
- self.assertIsNone(job_on_failed_queue)
-
- job.set_status(JobStatus.FAILED)
-
- job_on_default_queue = default_queue.fetch_job(job.id)
- job_on_failed_queue = failed_queue.fetch_job(job.id)
-
- self.assertIsNotNone(job_on_default_queue)
- self.assertIsNotNone(job_on_failed_queue)
- self.assertTrue(job_on_default_queue.is_failed)
-
- def test_requeue_nonfailed_job_fails(self):
- """Requeueing non-failed jobs raises error."""
- q = Queue()
- job = q.enqueue(say_hello, 'Nick', foo='bar')
-
- # Assert that we cannot requeue a job that's not on the failed queue
- with self.assertRaises(InvalidJobOperationError):
- get_failed_queue().requeue(job.id)
-
- def test_quarantine_preserves_timeout(self):
- """Quarantine preserves job timeout."""
- job = Job.create(func=div_by_zero, args=(1, 2, 3))
- job.origin = 'fake'
- job.timeout = 200
- job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error'))
-
- self.assertEqual(job.timeout, 200)
-
- def test_requeueing_preserves_timeout(self):
- """Requeueing preserves job timeout."""
- job = Job.create(func=div_by_zero, args=(1, 2, 3))
- job.origin = 'fake'
- job.timeout = 200
- job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error'))
- get_failed_queue().requeue(job.id)
-
- job = Job.fetch(job.id)
- self.assertEqual(job.timeout, 200)
-
- def test_requeue_sets_status_to_queued(self):
- """Requeueing a job should set its status back to QUEUED."""
- job = Job.create(func=div_by_zero, args=(1, 2, 3))
- job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error'))
- get_failed_queue().requeue(job.id)
-
- job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), JobStatus.QUEUED)
-
- def test_enqueue_preserves_result_ttl(self):
- """Enqueueing persists result_ttl."""
- q = Queue()
- job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10)
- self.assertEqual(job.result_ttl, 10)
- job_from_queue = Job.fetch(job.id, connection=self.testconn)
- self.assertEqual(int(job_from_queue.result_ttl), 10)
-
- def test_async_false(self):
- """Job executes and cleaned up immediately if is_async=False."""
- q = Queue(is_async=False)
- job = q.enqueue(some_calculation, args=(2, 3))
- self.assertEqual(job.return_value, 6)
- self.assertNotEqual(self.testconn.ttl(job.key), -1)
-
- def test_is_async(self):
- """Queue exposes is_async as a property."""
- inline_queue = Queue(is_async=False)
- self.assertFalse(inline_queue.is_async)
- async_queue = Queue(is_async=True)
- self.assertTrue(async_queue.is_async)
-
- def test_custom_job_class(self):
- """Ensure custom job class assignment works as expected."""
- q = Queue(job_class=CustomJob)
- self.assertEqual(q.job_class, CustomJob)
-
- def test_skip_queue(self):
- """Ensure the skip_queue option functions"""
- q = Queue('foo')
- job1 = q.enqueue(say_hello)
- job2 = q.enqueue(say_hello)
- assert q.dequeue() == job1
- skip_job = q.enqueue(say_hello, at_front=True)
- assert q.dequeue() == skip_job
- assert q.dequeue() == job2
-
- def test_job_deletion(self):
- """Ensure job.delete() removes itself from FailedQueue."""
- job = Job.create(func=div_by_zero, args=(1, 2, 3))
- job.origin = 'fake'
- job.timeout = 200
- job.save()
-
- job.set_status(JobStatus.FAILED)
-
- failed_queue = get_failed_queue()
- failed_queue.quarantine(job, Exception('Some fake error'))
-
- self.assertTrue(job.id in failed_queue.get_job_ids())
-
- job.delete()
- self.assertFalse(job.id in failed_queue.get_job_ids())
-
- def test_job_in_failed_queue_persists(self):
- """Make sure failed job key does not expire"""
- q = Queue('foo')
- job = q.enqueue(div_by_zero, args=(1,), ttl=5)
- self.assertEqual(self.testconn.ttl(job.key), 5)
-
- self.assertRaises(ZeroDivisionError, job.perform)
- job.set_status(JobStatus.FAILED)
- failed_queue = get_failed_queue()
- failed_queue.quarantine(job, Exception('Some fake error'))
-
- self.assertEqual(self.testconn.ttl(job.key), -1)
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 41d0c1c..8790934 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -2,12 +2,15 @@
from __future__ import absolute_import
from rq.compat import as_text
-from rq.job import Job, JobStatus
-from rq.queue import FailedQueue, Queue
+from rq.defaults import DEFAULT_FAILURE_TTL
+from rq.exceptions import InvalidJobOperation
+from rq.job import Job, JobStatus, requeue_job
+from rq.queue import Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import (clean_registries, DeferredJobRegistry,
- FinishedJobRegistry, StartedJobRegistry)
+ FailedJobRegistry, FinishedJobRegistry,
+ StartedJobRegistry)
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
@@ -41,6 +44,19 @@ class TestRegistry(RQTestCase):
registry = StartedJobRegistry(job_class=CustomJob)
self.assertFalse(registry.job_class == self.registry.job_class)
+ def test_contains(self):
+ registry = StartedJobRegistry(connection=self.testconn)
+ queue = Queue(connection=self.testconn)
+ job = queue.enqueue(say_hello)
+
+ self.assertFalse(job in registry)
+ self.assertFalse(job.id in registry)
+
+ registry.add(job, 5)
+
+ self.assertTrue(job in registry)
+ self.assertTrue(job.id in registry)
+
def test_add_and_remove(self):
"""Adding and removing job to StartedJobRegistry."""
timestamp = current_timestamp()
@@ -78,23 +94,22 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar'])
- def test_cleanup(self):
- """Moving expired jobs to FailedQueue."""
- failed_queue = FailedQueue(connection=self.testconn)
- self.assertTrue(failed_queue.is_empty())
-
+ def test_cleanup_moves_jobs_to_failed_job_registry(self):
+ """Moving expired jobs to FailedJobRegistry."""
queue = Queue(connection=self.testconn)
+ failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
+ # Job has not been moved to FailedJobRegistry
self.registry.cleanup(1)
- self.assertNotIn(job.id, failed_queue.job_ids)
- self.assertEqual(self.testconn.zscore(self.registry.key, job.id), 2)
+ self.assertNotIn(job, failed_job_registry)
+ self.assertIn(job, self.registry)
self.registry.cleanup()
- self.assertIn(job.id, failed_queue.job_ids)
- self.assertEqual(self.testconn.zscore(self.registry.key, job.id), None)
+ self.assertIn(job.id, failed_job_registry)
+ self.assertNotIn(job, self.registry)
job.refresh()
self.assertEqual(job.get_status(), JobStatus.FAILED)
@@ -158,9 +173,22 @@ class TestRegistry(RQTestCase):
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')
+ failed_job_registry = FailedJobRegistry(connection=self.testconn)
+ self.testconn.zadd(failed_job_registry.key, 1, 'foo')
+
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
+ self.assertEqual(self.testconn.zcard(failed_job_registry.key), 0)
+
+ def test_get_queue(self):
+ """registry.get_queue() returns the right Queue object."""
+ registry = StartedJobRegistry(connection=self.testconn)
+ self.assertEqual(registry.get_queue(), Queue(connection=self.testconn))
+
+ registry = StartedJobRegistry('foo', connection=self.testconn)
+ self.assertEqual(registry.get_queue(),
+ Queue('foo', connection=self.testconn))
class TestFinishedJobRegistry(RQTestCase):
@@ -225,7 +253,7 @@ class TestDeferredRegistry(RQTestCase):
self.assertEqual(job_ids, [job.id])
def test_register_dependency(self):
- """Ensure job creation and deletion works properly with DeferredJobRegistry."""
+ """Ensure job creation and deletion works with DeferredJobRegistry."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
job2 = queue.enqueue(say_hello, depends_on=job)
@@ -236,3 +264,119 @@ class TestDeferredRegistry(RQTestCase):
# When deleted, job removes itself from DeferredJobRegistry
job2.delete()
self.assertEqual(registry.get_job_ids(), [])
+
+
+class TestFailedJobRegistry(RQTestCase):
+
+ def test_default_failure_ttl(self):
+ """Job TTL defaults to DEFAULT_FAILURE_TTL"""
+ queue = Queue(connection=self.testconn)
+ job = queue.enqueue(say_hello)
+
+ registry = FailedJobRegistry(connection=self.testconn)
+ key = registry.key
+
+ timestamp = current_timestamp()
+ registry.add(job)
+ self.assertLess(
+ self.testconn.zscore(key, job.id),
+ timestamp + DEFAULT_FAILURE_TTL + 2
+ )
+ self.assertGreater(
+ self.testconn.zscore(key, job.id),
+ timestamp + DEFAULT_FAILURE_TTL - 2
+ )
+
+ timestamp = current_timestamp()
+ ttl = 5
+ registry.add(job, ttl=5)
+ self.assertLess(
+ self.testconn.zscore(key, job.id),
+ timestamp + ttl + 2
+ )
+ self.assertGreater(
+ self.testconn.zscore(key, job.id),
+ timestamp + ttl - 2
+ )
+
+ def test_requeue(self):
+ """FailedJobRegistry.requeue works properly"""
+ queue = Queue(connection=self.testconn)
+ job = queue.enqueue(div_by_zero, failure_ttl=5)
+
+ worker = Worker([queue])
+ worker.work(burst=True)
+
+ registry = FailedJobRegistry(connection=worker.connection)
+ self.assertTrue(job in registry)
+
+ registry.requeue(job.id)
+ self.assertFalse(job in registry)
+ self.assertIn(job.id, queue.get_job_ids())
+
+ job.refresh()
+ self.assertEqual(job.status, JobStatus.QUEUED)
+
+ worker.work(burst=True)
+ self.assertTrue(job in registry)
+
+ # Should also work with job instance
+ registry.requeue(job)
+ self.assertFalse(job in registry)
+ self.assertIn(job.id, queue.get_job_ids())
+
+ job.refresh()
+ self.assertEqual(job.status, JobStatus.QUEUED)
+
+ worker.work(burst=True)
+ self.assertTrue(job in registry)
+
+ # requeue_job should work the same way
+ requeue_job(job.id, connection=self.testconn)
+ self.assertFalse(job in registry)
+ self.assertIn(job.id, queue.get_job_ids())
+
+ job.refresh()
+ self.assertEqual(job.status, JobStatus.QUEUED)
+
+ worker.work(burst=True)
+ self.assertTrue(job in registry)
+
+ # And so does job.requeue()
+ job.requeue()
+ self.assertFalse(job in registry)
+ self.assertIn(job.id, queue.get_job_ids())
+
+ job.refresh()
+ self.assertEqual(job.status, JobStatus.QUEUED)
+
+ def test_invalid_job(self):
+ """Requeuing a job that's not in FailedJobRegistry raises an error."""
+ queue = Queue(connection=self.testconn)
+ job = queue.enqueue(say_hello)
+
+ registry = FailedJobRegistry(connection=self.testconn)
+ with self.assertRaises(InvalidJobOperation):
+ registry.requeue(job)
+
+ def test_worker_handle_job_failure(self):
+ """Failed jobs are added to FailedJobRegistry"""
+ q = Queue(connection=self.testconn)
+
+ w = Worker([q])
+ registry = FailedJobRegistry(connection=w.connection)
+
+ timestamp = current_timestamp()
+
+ job = q.enqueue(div_by_zero, failure_ttl=5)
+ w.handle_job_failure(job)
+ # job is added to FailedJobRegistry with default failure ttl
+ self.assertIn(job.id, registry.get_job_ids())
+ self.assertLess(self.testconn.zscore(registry.key, job.id),
+ timestamp + DEFAULT_FAILURE_TTL + 5)
+
+ # job is added to FailedJobRegistry with specified ttl
+ job = q.enqueue(div_by_zero, failure_ttl=5)
+ w.handle_job_failure(job)
+ self.assertLess(self.testconn.zscore(registry.key, job.id),
+ timestamp + 7)
diff --git a/tests/test_sentry.py b/tests/test_sentry.py
index 7fa7bcb..7557dc4 100644
--- a/tests/test_sentry.py
+++ b/tests/test_sentry.py
@@ -2,8 +2,9 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
-from rq import get_failed_queue, Queue, Worker
+from rq import Queue, Worker
from rq.contrib.sentry import register_sentry
+from rq.registry import FailedJobRegistry
from tests import RQTestCase
@@ -20,10 +21,9 @@ class TestSentry(RQTestCase):
def test_work_fails(self):
"""Non importable jobs should be put on the failed queue event with sentry"""
q = Queue()
- failed_q = get_failed_queue()
# Action
- q.enqueue('_non.importable.job')
+ job = q.enqueue('_non.importable.job')
self.assertEqual(q.count, 1)
w = Worker([q])
@@ -32,5 +32,6 @@ class TestSentry(RQTestCase):
w.work(burst=True)
# Postconditions
- self.assertEqual(failed_q.count, 1)
+ registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in registry)
self.assertEqual(q.count, 0)
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 2f0a9df..d2d9c26 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -27,11 +27,10 @@ from tests.fixtures import (
modify_self_and_error, long_running_job, save_key_ttl
)
-from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
- get_current_connection)
+from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2
from rq.job import Job, JobStatus
-from rq.registry import StartedJobRegistry
+from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
from rq.worker import HerokuWorker, WorkerStatus
@@ -197,17 +196,14 @@ class TestWorker(RQTestCase):
)
def test_work_is_unreadable(self):
- """Unreadable jobs are put on the failed queue."""
+ """Unreadable jobs are put on the failed job registry."""
q = Queue()
- failed_q = get_failed_queue()
-
- self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
# NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not
# importable from the worker process.
- job = Job.create(func=div_by_zero, args=(3,))
+ job = Job.create(func=div_by_zero, args=(3,), origin=q.name)
job.save()
job_data = job.data
@@ -225,7 +221,9 @@ class TestWorker(RQTestCase):
w = Worker([q])
w.work(burst=True) # should silently pass
self.assertEqual(q.count, 0)
- self.assertEqual(failed_q.count, 1)
+
+ failed_job_registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in failed_job_registry)
def test_heartbeat(self):
"""Heartbeat saves last_heartbeat"""
@@ -268,10 +266,6 @@ class TestWorker(RQTestCase):
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
q = Queue()
- failed_q = get_failed_queue()
-
- # Preconditions
- self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
# Action
@@ -286,7 +280,8 @@ class TestWorker(RQTestCase):
# Postconditions
self.assertEqual(q.count, 0)
- self.assertEqual(failed_q.count, 1)
+ failed_job_registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None)
# Check the job
@@ -296,7 +291,7 @@ class TestWorker(RQTestCase):
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
- self.assertIsNotNone(job.exc_info) # should contain exc_info
+ self.assertTrue(job.exc_info) # should contain exc_info
def test_statistics(self):
"""Successful and failed job counts are saved properly"""
@@ -328,33 +323,75 @@ class TestWorker(RQTestCase):
self.assertEqual(w.successful_job_count, 2)
self.assertEqual(w.total_working_time, 3000000)
+ def test_disable_default_exception_handler(self):
+ """
+ Job is not moved to FailedJobRegistry when default custom exception
+ handler is disabled.
+ """
+ queue = Queue(name='default', connection=self.testconn)
+
+ job = queue.enqueue(div_by_zero)
+ worker = Worker([queue], disable_default_exception_handler=False)
+ worker.work(burst=True)
+
+ registry = FailedJobRegistry(queue=queue)
+ self.assertTrue(job in registry)
+
+ # Job is not added to FailedJobRegistry if
+ # disable_default_exception_handler is True
+ job = queue.enqueue(div_by_zero)
+ worker = Worker([queue], disable_default_exception_handler=True)
+ worker.work(burst=True)
+ self.assertFalse(job in registry)
+
def test_custom_exc_handling(self):
"""Custom exception handling."""
+
+ def first_handler(job, *exc_info):
+ job.meta = {'first_handler': True}
+ job.save_meta()
+ return True
+
+ def second_handler(job, *exc_info):
+ job.meta.update({'second_handler': True})
+ job.save_meta()
+
def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
q = Queue()
- failed_q = get_failed_queue()
-
- # Preconditions
- self.assertEqual(failed_q.count, 0)
self.assertEqual(q.count, 0)
+ job = q.enqueue(div_by_zero)
+
+ w = Worker([q], exception_handlers=first_handler)
+ w.work(burst=True)
+
+ # Check the job
+ job.refresh()
+ self.assertEqual(job.is_failed, True)
+ self.assertTrue(job.meta['first_handler'])
- # Action
job = q.enqueue(div_by_zero)
- self.assertEqual(q.count, 1)
+ w = Worker([q], exception_handlers=[first_handler, second_handler])
+ w.work(burst=True)
- w = Worker([q], exception_handlers=black_hole)
- w.work(burst=True) # should silently pass
+ # Both custom exception handlers are run
+ job.refresh()
+ self.assertEqual(job.is_failed, True)
+ self.assertTrue(job.meta['first_handler'])
+ self.assertTrue(job.meta['second_handler'])
- # Postconditions
- self.assertEqual(q.count, 0)
- self.assertEqual(failed_q.count, 0)
+ job = q.enqueue(div_by_zero)
+ w = Worker([q], exception_handlers=[first_handler, black_hole,
+ second_handler])
+ w.work(burst=True)
- # Check the job
- job = Job.fetch(job.id)
+ # second_handler is not run since it's interrupted by black_hole
+ job.refresh()
self.assertEqual(job.is_failed, True)
+ self.assertTrue(job.meta['first_handler'])
+ self.assertEqual(job.meta.get('second_handler'), None)
def test_cancelled_jobs_arent_executed(self):
"""Cancelling jobs."""
@@ -771,7 +808,6 @@ class TestWorker(RQTestCase):
the job itself persists completely through the
queue/worker/job stack -- even if the job errored"""
q = Queue()
- failed_q = get_failed_queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42},
@@ -782,7 +818,8 @@ class TestWorker(RQTestCase):
# Postconditions
self.assertEqual(q.count, 0)
- self.assertEqual(failed_q.count, 1)
+ failed_job_registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None)
job_check = Job.fetch(job.id)
@@ -909,8 +946,6 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
completing the job) should set the job's status to FAILED
"""
fooq = Queue('foo')
- failed_q = get_failed_queue()
- self.assertEqual(failed_q.count, 0)
self.assertEqual(fooq.count, 0)
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
@@ -925,7 +960,8 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
job_status = job.get_status()
p.join(1)
self.assertEqual(job_status, JobStatus.FAILED)
- self.assertEqual(failed_q.count, 1)
+ failed_job_registry = FailedJobRegistry(queue=fooq)
+ self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0)
@@ -948,18 +984,20 @@ class TestWorkerSubprocess(RQTestCase):
def test_run_access_self(self):
"""Schedule a job, then run the worker as subprocess"""
q = Queue()
- q.enqueue(access_self)
+ job = q.enqueue(access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
- assert get_failed_queue().count == 0
+ registry = FinishedJobRegistry(queue=q)
+ self.assertTrue(job in registry)
assert q.count == 0
@skipIf('pypy' in sys.version.lower(), 'often times out with pypy')
def test_run_scheduled_access_self(self):
"""Schedule a job that schedules a job, then run the worker as subprocess"""
q = Queue()
- q.enqueue(schedule_access_self)
+ job = q.enqueue(schedule_access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
- assert get_failed_queue().count == 0
+ registry = FinishedJobRegistry(queue=q)
+ self.assertTrue(job in registry)
assert q.count == 0
@@ -1063,7 +1101,6 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
super(TestExceptionHandlerMessageEncoding, self).setUp()
self.worker = Worker("foo")
self.worker._exc_handlers = []
- self.worker.failed_queue = Mock()
# Mimic how exception info is actually passed forwards
try:
raise Exception(u"💪")
@@ -1073,7 +1110,3 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
def test_handle_exception_handles_non_ascii_in_exception_message(self):
"""worker.handle_exception doesn't crash on non-ascii in exception message."""
self.worker.handle_exception(Mock(), *self.exc_info)
-
- def test_move_to_failed_queue_handles_non_ascii_in_exception_message(self):
- """Test that move_to_failed_queue doesn't crash on non-ascii in exception message."""
- self.worker.move_to_failed_queue(Mock(), *self.exc_info)