summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
committerSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
commita4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch)
treeef4c0e7c1c62f485f00f7b85277beecaf515c120
parent63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff)
parent04722339d7598ff0c52f11c3680ed2dd922e6768 (diff)
downloadrq-watcher.tar.gz
Merge branch 'master' of github.com:rq/rq into watcherwatcher
-rw-r--r--.github/workflows/dependencies.yml4
-rw-r--r--.github/workflows/workflow.yml3
-rw-r--r--CHANGES.md12
-rw-r--r--docs/docs/connections.md101
-rw-r--r--docs/docs/exceptions.md33
-rw-r--r--docs/docs/index.md15
-rw-r--r--docs/docs/scheduling.md23
-rw-r--r--docs/docs/workers.md15
-rw-r--r--docs/patterns/index.md21
-rw-r--r--pyproject.toml4
-rw-r--r--rq/__init__.py4
-rwxr-xr-xrq/cli/cli.py40
-rw-r--r--rq/cli/helpers.py31
-rw-r--r--rq/connections.py48
-rw-r--r--rq/defaults.py6
-rw-r--r--rq/exceptions.py4
-rw-r--r--rq/job.py123
-rw-r--r--rq/local.py23
-rw-r--r--rq/queue.py77
-rw-r--r--rq/registry.py27
-rw-r--r--rq/results.py9
-rw-r--r--rq/scheduler.py40
-rw-r--r--rq/timeouts.py4
-rw-r--r--rq/utils.py9
-rw-r--r--rq/version.py2
-rw-r--r--rq/worker.py332
-rw-r--r--rq/worker_registration.py1
-rw-r--r--tests/test_cli.py34
-rw-r--r--tests/test_connection.py33
-rw-r--r--tests/test_dependencies.py31
-rw-r--r--tests/test_helpers.py48
-rw-r--r--tests/test_job.py33
-rw-r--r--tests/test_registry.py9
-rw-r--r--tests/test_results.py22
-rw-r--r--tests/test_timeouts.py7
-rw-r--r--tests/test_worker.py95
36 files changed, 933 insertions, 390 deletions
diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml
index b2b5915..0ce9588 100644
--- a/.github/workflows/dependencies.yml
+++ b/.github/workflows/dependencies.yml
@@ -26,7 +26,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Start Redis
- uses: supercharge/redis-github-action@1.4.0
+ uses: supercharge/redis-github-action@1.5.0
with:
redis-version: ${{ matrix.redis-version }}
@@ -61,7 +61,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Start Redis
- uses: supercharge/redis-github-action@1.4.0
+ uses: supercharge/redis-github-action@1.5.0
with:
redis-version: ${{ matrix.redis-version }}
diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml
index d089f80..586529d 100644
--- a/.github/workflows/workflow.yml
+++ b/.github/workflows/workflow.yml
@@ -28,7 +28,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Start Redis
- uses: supercharge/redis-github-action@1.4.0
+ uses: supercharge/redis-github-action@1.5.0
with:
redis-version: ${{ matrix.redis-version }}
@@ -47,3 +47,4 @@ jobs:
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
+ fail_ci_if_error: false
diff --git a/CHANGES.md b/CHANGES.md
index 6b6f4d0..1fd88ee 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,15 @@
+### RQ 1.13.0 (2023-02-19)
+* Added `work_horse_killed_handler` argument to `Worker`. Thanks @ronlut!
+* Fixed an issue where results aren't properly persisted on synchronous jobs. Thanks @selwin!
+* Fixed a bug where job results are not properly persisted when `result_ttl` is `-1`. Thanks @sim6!
+* Various documentation and logging fixes. Thanks @lowercase00!
+* Improve Redis connection reliability. Thanks @lowercase00!
+* Scheduler reliability improvements. Thanks @OlegZv and @lowercase00!
+* Fixed a bug where `dequeue_timeout` ignores `worker_ttl`. Thanks @ronlut!
+* Use `job.return_value()` instead of `job.result` when processing callbacks. Thanks @selwin!
+* Various internal refactorings to make `Worker` code more easily extendable. Thanks @lowercase00!
+* RQ's source code is now black formatted. Thanks @aparcar!
+
### RQ 1.12.0 (2023-01-15)
* RQ now stores multiple job execution results. This feature is only available on Redis >= 5.0 Redis Streams. Please refer to [the docs](https://python-rq.org/docs/results/) for more info. Thanks @selwin!
* Improve performance when enqueueing many jobs at once. Thanks @rggjan!
diff --git a/docs/docs/connections.md b/docs/docs/connections.md
index 8e563d9..4382cef 100644
--- a/docs/docs/connections.md
+++ b/docs/docs/connections.md
@@ -3,61 +3,20 @@ title: "RQ: Connections"
layout: docs
---
-Although RQ features the `use_connection()` command for convenience, it
-is deprecated, since it pollutes the global namespace. Instead, prefer explicit
-connection management using the `with Connection(...):` context manager, or
-pass in Redis connection references to queues directly.
+### The connection parameter
-
-## Single Redis connection (easy)
-
-<div class="warning">
- <img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
- <strong>Note:</strong>
- <p>
- The use of <code>use_connection</code> is deprecated.
- Please don't use <code>use_connection</code> in your scripts.
- Instead, use explicit connection management.
- </p>
-</div>
-
-In development mode, to connect to a default, local Redis server:
-
-```python
-from rq import use_connection
-use_connection()
-```
-
-In production, to connect to a specific Redis server:
+Each RQ object (queues, workers, jobs) has a `connection` keyword
+argument that can be passed to the constructor - this is the recommended way of handling connections.
```python
from redis import Redis
-from rq import use_connection
+from rq import Queue
-redis = Redis('my.host.org', 6789, password='secret')
-use_connection(redis)
+redis = Redis('localhost', 6789)
+q = Queue(connection=redis)
```
-Be aware of the fact that `use_connection` pollutes the global namespace. It
-also implies that you can only ever use a single connection.
-
-
-## Multiple Redis connections
-
-However, the single connection pattern facilitates only those cases where you
-connect to a single Redis instance, and where you affect global context (by
-replacing the existing connection with the `use_connection()` call). You can
-only use this pattern when you are in full control of your web stack.
-
-In any other situation, or when you want to use multiple connections, you
-should use `Connection` contexts or pass connections around explicitly.
-
-
-### Explicit connections (precise, but tedious)
-
-Each RQ object instance (queues, workers, jobs) has a `connection` keyword
-argument that can be passed to the constructor. Using this, you don't need to
-use `use_connection()`. Instead, you can create your queues like this:
+This pattern allows for different connections to be passed to different objects:
```python
from rq import Queue
@@ -73,11 +32,19 @@ q2 = Queue('bar', connection=conn2)
Every job that is enqueued on a queue will know what connection it belongs to.
The same goes for the workers.
-This approach is very precise, but rather verbose, and therefore, tedious.
-
### Connection contexts (precise and concise)
+<div class="warning">
+ <img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" />
+ <strong>Note:</strong>
+ <p>
+ The use of <code>Connection</code> context manager is deprecated.
+ Please don't use <code>Connection</code> in your scripts.
+ Instead, use explicit connection management.
+ </p>
+</div>
+
There is a better approach if you want to use multiple connections, though.
Each RQ object instance, upon creation, will use the topmost Redis connection
on the RQ connection stack, which is a mechanism to temporarily replace the
@@ -136,11 +103,21 @@ Using this setting in conjunction with the systemd or docker containers with the
automatic restart option allows workers and RQ to have a fault-tolerant connection to the redis.
```python
-SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
- 'SOCKET_TIMEOUT': None,
- 'PASSWORD': 'secret',
- 'DB': 2,
- 'MASTER_NAME': 'master'}
+SENTINEL: {
+ 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
+ 'MASTER_NAME': 'master',
+ 'DB': 2,
+ 'USERNAME': 'redis-user',
+ 'PASSWORD': 'redis-secret',
+ 'SOCKET_TIMEOUT': None,
+ 'CONNECTION_KWARGS': { # Eventual addition Redis connection arguments
+ 'ssl_ca_path': None,
+ },
+ 'SENTINEL_KWARGS': { # Eventual Sentinels connections arguments
+ 'username': 'sentinel-user',
+ 'password': 'sentinel-secret',
+ },
+}
```
@@ -162,3 +139,17 @@ q = Queue(connection=conn)
Setting a `socket_timeout` with a lower value than the `default_worker_ttl` will cause a `TimeoutError`
since it will interrupt the worker while it gets new jobs from the queue.
+
+
+### Encoding / Decoding
+
+The encoding and decoding of Redis objects occur in multiple locations within the codebase,
+which means that the `decode_responses=True` argument of the Redis connection is not currently supported.
+
+```python
+from redis import Redis
+from rq import Queue
+
+conn = Redis(..., decode_responses=True) # This is not supported
+q = Queue(connection=conn)
+```
diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md
index f757f2c..2ab8473 100644
--- a/docs/docs/exceptions.md
+++ b/docs/docs/exceptions.md
@@ -119,3 +119,36 @@ use a custom exception handler that doesn't fall through, for example:
def black_hole(job, *exc_info):
return False
```
+
+## Work Horse Killed Handler
+_New in version 1.13.0._
+
+In addition to job exception handler(s), RQ supports registering a handler for unexpected workhorse termination.
+This handler is called when a workhorse is unexpectedly terminated, for example due to OOM.
+
+This is how you set a workhorse termination handler to an RQ worker:
+
+```python
+from my_handlers import my_work_horse_killed_handler
+
+w = Worker([q], work_horse_killed_handler=my_work_horse_killed_handler)
+```
+
+The handler itself is a function that takes the following parameters: `job`,
+`retpid`, `ret_val` and `rusage`:
+
+```python
+from resource import struct_rusage
+from rq.job import Job
+def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: struct_rusage):
+ # do your thing here, for example set job.retries_left to 0
+
+```
+
+## Built-in Exceptions
+RQ Exceptions you can get in your job failure callbacks
+
+# AbandonedJobError
+This error means an unfinished job was collected by another worker's maintenance task.
+This usually happens when a worker is busy with a job and is terminated before it finished that job.
+Another worker collects this job and moves it to the FailedJobRegistry. \ No newline at end of file
diff --git a/docs/docs/index.md b/docs/docs/index.md
index bbf7841..1c24c6b 100644
--- a/docs/docs/index.md
+++ b/docs/docs/index.md
@@ -202,6 +202,21 @@ If you want to execute a function whenever a job completes or fails, RQ provides
queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure)
```
+### Callback Class and Callback Timeouts
+
+_New in version 1.14.0_
+
+RQ lets you configure the method and timeout for each callback - success and failure.
+To configure callback timeouts, use RQ's
+`Callback` object that accepts `func` and `timeout` arguments. For example:
+
+```python
+from rq import Callback
+queue.enqueue(say_hello,
+ on_success=Callback(report_success), # default callback timeout (60 seconds)
+ on_failure=Callback(report_failure, timeout=10)) # 10 seconds timeout
+```
+
### Success Callback
Success callbacks must be a function that accepts `job`, `connection` and `result` arguments.
diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md
index 03aa1fa..cb297fa 100644
--- a/docs/docs/scheduling.md
+++ b/docs/docs/scheduling.md
@@ -95,8 +95,8 @@ from rq import Worker, Queue
from redis import Redis
redis = Redis()
-
queue = Queue(connection=redis)
+
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)
```
@@ -113,5 +113,26 @@ working. This way, if a worker with active scheduler dies, the scheduling work w
up by other workers with the scheduling component enabled.
+## Safe importing of the worker module
+
+When running the worker programmatically with the scheduler, you must keep in mind that the
+import must be protected with `if __name__ == '__main__'`. The scheduler runs on it's own process
+(using `multiprocessing` from the stdlib), so the new spawned process must able to safely import the module without
+causing any side effects (starting a new process on top of the main ones).
+
+```python
+...
+
+# When running `with_scheduler=True` this is necessary
+if __name__ == '__main__':
+ worker = Worker(queues=[queue], connection=redis)
+ worker.work(with_scheduler=True)
+
+...
+# When running without the scheduler this is fine
+worker = Worker(queues=[queue], connection=redis)
+worker.work()
+```
+More information on the Python official docs [here](https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods).
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index d19451f..52b4ff3 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute.
+* `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`)
_New in version 1.8.0._
* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer")
@@ -317,19 +318,21 @@ $ rq worker -w 'path.to.GeventWorker'
The default worker considers the order of queues as their priority order,
and if a task is pending in a higher priority queue
-it will be selected before any other in queues with lower priority.
+it will be selected before any other in queues with lower priority (the `default` behavior).
+To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option.
In certain circumstances it can be useful that a when a worker is listening to multiple queues,
say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st
dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th
-from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker`
-implements this strategy.
+from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument.
-In some other circumstances, when a worker is listening to multiple queues, it can be useful
-to pull jobs from the different queues randomly. The custom class `rq.worker.RandomWorker`
-implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is
+In other circumstances, it can be useful to pull jobs from the different queues randomly.
+To implement this strategy use `-ds random` argument.
+In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is
shuffled, so that no queue has more priority than the other ones.
+Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker`
+and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions.
## Custom Job and Queue Classes
diff --git a/docs/patterns/index.md b/docs/patterns/index.md
index 23f67a4..818dc14 100644
--- a/docs/patterns/index.md
+++ b/docs/patterns/index.md
@@ -12,24 +12,24 @@ To setup RQ on [Heroku][1], first add it to your
rq>=0.13
Create a file called `run-worker.py` with the following content (assuming you
-are using [Redis To Go][2] with Heroku):
+are using [Heroku Data For Redis][2] with Heroku):
```python
import os
-import urlparse
+import redis
from redis import Redis
from rq import Queue, Connection
from rq.worker import HerokuWorker as Worker
+
listen = ['high', 'default', 'low']
-redis_url = os.getenv('REDISTOGO_URL')
+redis_url = os.getenv('REDIS_URL')
if not redis_url:
- raise RuntimeError('Set up Redis To Go first.')
-
-urlparse.uses_netloc.append('redis')
-url = urlparse.urlparse(redis_url)
-conn = Redis(host=url.hostname, port=url.port, db=0, password=url.password)
+ raise RuntimeError("Set up Heroku Data For Redis first, \
+ make sure its config var is named 'REDIS_URL'.")
+
+conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
@@ -47,7 +47,7 @@ Now, all you have to do is spin up a worker:
$ heroku scale worker=1
```
-If you are using [Heroku Redis][5]) you might need to change the Redis connection as follows:
+If the from_url function fails to parse your credentials, you might need to do so manually:
```console
conn = redis.Redis(
@@ -58,6 +58,7 @@ conn = redis.Redis(
ssl_cert_reqs=None
)
```
+The details are from the 'settings' page of your Redis add-on on the Heroku dashboard.
and for using the cli:
@@ -90,7 +91,7 @@ force stdin, stdout and stderr to be totally unbuffered):
worker: python -u run-worker.py
[1]: https://heroku.com
-[2]: https://devcenter.heroku.com/articles/redistogo
+[2]: https://devcenter.heroku.com/articles/heroku-redis
[3]: https://github.com/ddollar/foreman
[4]: https://github.com/ddollar/foreman/wiki/Missing-Output
[5]: https://elements.heroku.com/addons/heroku-redis
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..4787b13
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,4 @@
+[tool.black]
+line-length = 120
+target-version = ['py36']
+skip-string-normalization = true \ No newline at end of file
diff --git a/rq/__init__.py b/rq/__init__.py
index d5db681..0ab7065 100644
--- a/rq/__init__.py
+++ b/rq/__init__.py
@@ -1,7 +1,7 @@
# flake8: noqa
-from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection
-from .job import cancel_job, get_current_job, requeue_job, Retry
+from .connections import Connection, get_current_connection, pop_connection, push_connection
+from .job import cancel_job, get_current_job, requeue_job, Retry, Callback
from .queue import Queue
from .version import VERSION
from .worker import SimpleWorker, Worker
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index f781010..27058e8 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -5,6 +5,7 @@ RQ command line tool
from functools import update_wrapper
import os
import sys
+import warnings
import click
from redis.exceptions import ConnectionError
@@ -32,7 +33,7 @@ from rq.defaults import (
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
- DEFAULT_SERIALIZER_CLASS,
+ DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL,
)
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
@@ -104,7 +105,8 @@ def empty(cli_config, all, queues, serializer, **options):
if all:
queues = cli_config.queue_class.all(
- connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
+ connection=cli_config.connection, job_class=cli_config.job_class,
+ death_penalty_class=cli_config.death_penalty_class, serializer=serializer
)
else:
queues = [
@@ -174,7 +176,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
try:
with Connection(cli_config.connection):
-
if queues:
qs = list(map(cli_config.queue_class, queues))
else:
@@ -200,7 +201,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
-@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used')
+@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used')
+@click.option(
+ '--maintenance-interval',
+ type=int,
+ default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
+ help='Maintenance task interval (in seconds) to be used'
+)
@click.option(
'--job-monitoring-interval',
type=int,
@@ -217,8 +224,10 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
+@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
+@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(
@@ -228,6 +237,7 @@ def worker(
name,
results_ttl,
worker_ttl,
+ maintenance_interval,
job_monitoring_interval,
disable_job_desc_logging,
verbose,
@@ -239,12 +249,14 @@ def worker(
pid,
disable_default_exception_handler,
max_jobs,
+ max_idle_time,
with_scheduler,
queues,
log_format,
date_format,
serializer,
- **options
+ dequeue_strategy,
+ **options,
):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
@@ -259,6 +271,17 @@ def worker(
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
+ worker_name = cli_config.worker_class.__qualname__
+ if worker_name in ["RoundRobinWorker", "RandomWorker"]:
+ strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
+ msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
+ warnings.warn(msg, DeprecationWarning)
+ click.secho(msg, fg='yellow')
+
+ if dequeue_strategy not in ("default", "random", "round_robin"):
+ click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
+ sys.exit(1)
+
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try:
@@ -283,13 +306,14 @@ def worker(
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
+ maintenance_interval=maintenance_interval,
job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
- serializer=serializer,
+ serializer=serializer
)
# Should we configure Sentry?
@@ -309,7 +333,9 @@ def worker(
date_format=date_format,
log_format=log_format,
max_jobs=max_jobs,
+ max_idle_time=max_idle_time,
with_scheduler=with_scheduler,
+ dequeue_strategy=dequeue_strategy
)
except ConnectionError as e:
print(e)
@@ -391,7 +417,7 @@ def enqueue(
serializer,
function,
arguments,
- **options
+ **options,
):
"""Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments)
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index fb20109..d7238ed 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -13,7 +13,8 @@ from shutil import get_terminal_size
import click
from redis import Redis
from redis.sentinel import Sentinel
-from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS
+from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, \
+ DEFAULT_DEATH_PENALTY_CLASS
from rq.logutils import setup_loghandlers
from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus
@@ -33,21 +34,27 @@ def get_redis_from_config(settings, connection_class=Redis):
"""Returns a StrictRedis instance from a dictionary of settings.
To use redis sentinel, you must specify a dictionary in the configuration file.
Example of a dictionary with keys without values:
- SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':}
+ SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'USERNAME':, 'PASSWORD':, 'DB':, 'MASTER_NAME':, 'SENTINEL_KWARGS':}
"""
if settings.get('REDIS_URL') is not None:
return connection_class.from_url(settings['REDIS_URL'])
elif settings.get('SENTINEL') is not None:
instances = settings['SENTINEL'].get('INSTANCES', [('localhost', 26379)])
- socket_timeout = settings['SENTINEL'].get('SOCKET_TIMEOUT', None)
- password = settings['SENTINEL'].get('PASSWORD', None)
- db = settings['SENTINEL'].get('DB', 0)
master_name = settings['SENTINEL'].get('MASTER_NAME', 'mymaster')
- ssl = settings['SENTINEL'].get('SSL', False)
- arguments = {'password': password, 'ssl': ssl}
+
+ connection_kwargs = {
+ 'db': settings['SENTINEL'].get('DB', 0),
+ 'username': settings['SENTINEL'].get('USERNAME', None),
+ 'password': settings['SENTINEL'].get('PASSWORD', None),
+ 'socket_timeout': settings['SENTINEL'].get('SOCKET_TIMEOUT', None),
+ 'ssl': settings['SENTINEL'].get('SSL', False),
+ }
+ connection_kwargs.update(settings['SENTINEL'].get('CONNECTION_KWARGS', {}))
+ sentinel_kwargs = settings['SENTINEL'].get('SENTINEL_KWARGS', {})
+
sn = Sentinel(
- instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments
+ instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs
)
return sn.master_for(master_name)
@@ -100,7 +107,6 @@ def state_symbol(state):
def show_queues(queues, raw, by_queue, queue_class, worker_class):
-
num_jobs = 0
termwidth = get_terminal_size().columns
chartwidth = min(20, termwidth - 20)
@@ -141,7 +147,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
workers.add(worker)
if not by_queue:
-
for worker in workers:
queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)
@@ -298,6 +303,7 @@ class CliConfig:
config=None,
worker_class=DEFAULT_WORKER_CLASS,
job_class=DEFAULT_JOB_CLASS,
+ death_penalty_class=DEFAULT_DEATH_PENALTY_CLASS,
queue_class=DEFAULT_QUEUE_CLASS,
connection_class=DEFAULT_CONNECTION_CLASS,
path=None,
@@ -322,6 +328,11 @@ class CliConfig:
raise click.BadParameter(str(exc), param_hint='--job-class')
try:
+ self.death_penalty_class = import_attribute(death_penalty_class)
+ except (ImportError, AttributeError) as exc:
+ raise click.BadParameter(str(exc), param_hint='--death-penalty-class')
+
+ try:
self.queue_class = import_attribute(queue_class)
except (ImportError, AttributeError) as exc:
raise click.BadParameter(str(exc), param_hint='--queue-class')
diff --git a/rq/connections.py b/rq/connections.py
index 413ee5a..0d39e43 100644
--- a/rq/connections.py
+++ b/rq/connections.py
@@ -1,9 +1,10 @@
+import warnings
from contextlib import contextmanager
from typing import Optional
-import warnings
+
from redis import Redis
-from .local import LocalStack, release_local
+from .local import LocalStack
class NoRedisConnectionException(Exception):
@@ -31,7 +32,8 @@ def Connection(connection: Optional['Redis'] = None): # noqa
connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
"""
warnings.warn(
- "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning
+ "The Connection context manager is deprecated. Use the `connection` parameter instead.",
+ DeprecationWarning,
)
if connection is None:
connection = Redis()
@@ -41,7 +43,8 @@ def Connection(connection: Optional['Redis'] = None): # noqa
finally:
popped = pop_connection()
assert popped == connection, (
- 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.'
+ 'Unexpected Redis connection was popped off the stack. '
+ 'Check your Redis connection setup.'
)
@@ -52,6 +55,10 @@ def push_connection(redis: 'Redis'):
Args:
redis (Redis): A Redis connection
"""
+ warnings.warn(
+ "The `push_connection` function is deprecated. Pass the `connection` explicitly instead.",
+ DeprecationWarning,
+ )
_connection_stack.push(redis)
@@ -62,25 +69,13 @@ def pop_connection() -> 'Redis':
Returns:
redis (Redis): A Redis connection
"""
+ warnings.warn(
+ "The `pop_connection` function is deprecated. Pass the `connection` explicitly instead.",
+ DeprecationWarning,
+ )
return _connection_stack.pop()
-def use_connection(redis: Optional['Redis'] = None):
- """
- Clears the stack and uses the given connection. Protects against mixed
- use of use_connection() and stacked connection contexts.
-
- Args:
- redis (Optional[Redis], optional): A Redis Connection. Defaults to None.
- """
- assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()'
- release_local(_connection_stack)
-
- if redis is None:
- redis = Redis()
- push_connection(redis)
-
-
def get_current_connection() -> 'Redis':
"""
Returns the current Redis connection (i.e. the topmost on the
@@ -89,6 +84,10 @@ def get_current_connection() -> 'Redis':
Returns:
Redis: A Redis Connection
"""
+ warnings.warn(
+ "The `get_current_connection` function is deprecated. Pass the `connection` explicitly instead.",
+ DeprecationWarning,
+ )
return _connection_stack.top
@@ -106,7 +105,10 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
Returns:
Redis: A Redis Connection
"""
-
+ warnings.warn(
+ "The `resolve_connection` function is deprecated. Pass the `connection` explicitly instead.",
+ DeprecationWarning,
+ )
if connection is not None:
return connection
@@ -118,4 +120,6 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
_connection_stack = LocalStack()
-__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection']
+
+__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection']
+
diff --git a/rq/defaults.py b/rq/defaults.py
index bd50489..2a3d57a 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -88,3 +88,9 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
Uses Python's default attributes as defined
https://docs.python.org/3/library/logging.html#logrecord-attributes
"""
+
+
+DEFAULT_DEATH_PENALTY_CLASS = 'rq.timeouts.UnixSignalDeathPenalty'
+""" The path for the default Death Penalty class to use.
+Defaults to the `UnixSignalDeathPenalty` class within the `rq.timeouts` module
+""" \ No newline at end of file
diff --git a/rq/exceptions.py b/rq/exceptions.py
index ee51753..b84ade1 100644
--- a/rq/exceptions.py
+++ b/rq/exceptions.py
@@ -30,3 +30,7 @@ class ShutDownImminentException(Exception):
class TimeoutFormatError(Exception):
pass
+
+
+class AbandonedJobError(Exception):
+ pass
diff --git a/rq/job.py b/rq/job.py
index 81ed98e..07ec6cb 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -1,16 +1,18 @@
import inspect
import json
+import logging
import warnings
import zlib
import asyncio
-from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from enum import Enum
from redis import WatchError
-from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Type
from uuid import uuid4
+from .defaults import CALLBACK_TIMEOUT
+from .timeouts import JobTimeoutException, BaseDeathPenalty
if TYPE_CHECKING:
from .results import Result
@@ -36,6 +38,8 @@ from .utils import (
utcnow,
)
+logger = logging.getLogger("rq.job")
+
class JobStatus(str, Enum):
"""The Status of Job within its lifecycle at any given time."""
@@ -153,8 +157,8 @@ class Job:
failure_ttl: Optional[int] = None,
serializer=None,
*,
- on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None
+ on_success: Optional[Union['Callback', Callable[..., Any]]] = None,
+ on_failure: Optional[Union['Callback', Callable[..., Any]]] = None
) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@@ -234,14 +238,20 @@ class Job:
job._kwargs = kwargs
if on_success:
- if not inspect.isfunction(on_success) and not inspect.isbuiltin(on_success):
- raise ValueError('on_success callback must be a function')
- job._success_callback_name = '{0}.{1}'.format(on_success.__module__, on_success.__qualname__)
+ if not isinstance(on_success, Callback):
+ warnings.warn('Passing a `Callable` `on_success` is deprecated, pass `Callback` instead',
+ DeprecationWarning)
+ on_success = Callback(on_success) # backward compatibility
+ job._success_callback_name = on_success.name
+ job._success_callback_timeout = on_success.timeout
if on_failure:
- if not inspect.isfunction(on_failure) and not inspect.isbuiltin(on_failure):
- raise ValueError('on_failure callback must be a function')
- job._failure_callback_name = '{0}.{1}'.format(on_failure.__module__, on_failure.__qualname__)
+ if not isinstance(on_failure, Callback):
+ warnings.warn('Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead',
+ DeprecationWarning)
+ on_failure = Callback(on_failure) # backward compatibility
+ job._failure_callback_name = on_failure.name
+ job._failure_callback_timeout = on_failure.timeout
# Extra meta data
job.description = description or job.get_call_string()
@@ -254,12 +264,17 @@ class Job:
# dependency could be job instance or id, or iterable thereof
if depends_on is not None:
- if isinstance(depends_on, Dependency):
- job.enqueue_at_front = depends_on.enqueue_at_front
- job.allow_dependency_failures = depends_on.allow_failure
- depends_on_list = depends_on.dependencies
- else:
- depends_on_list = ensure_list(depends_on)
+ depends_on = ensure_list(depends_on)
+ depends_on_list = []
+ for depends_on_item in depends_on:
+ if isinstance(depends_on_item, Dependency):
+ # If a Dependency has enqueue_at_front or allow_failure set to True, these behaviors are used for
+ # all dependencies.
+ job.enqueue_at_front = job.enqueue_at_front or depends_on_item.enqueue_at_front
+ job.allow_dependency_failures = job.allow_dependency_failures or depends_on_item.allow_failure
+ depends_on_list.extend(depends_on_item.dependencies)
+ else:
+ depends_on_list.extend(ensure_list(depends_on_item))
job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list]
return job
@@ -397,6 +412,13 @@ class Job:
return self._success_callback
@property
+ def success_callback_timeout(self) -> int:
+ if self._success_callback_timeout is None:
+ return CALLBACK_TIMEOUT
+
+ return self._success_callback_timeout
+
+ @property
def failure_callback(self):
if self._failure_callback is UNEVALUATED:
if self._failure_callback_name:
@@ -406,6 +428,13 @@ class Job:
return self._failure_callback
+ @property
+ def failure_callback_timeout(self) -> int:
+ if self._failure_callback_timeout is None:
+ return CALLBACK_TIMEOUT
+
+ return self._failure_callback_timeout
+
def _deserialize_data(self):
"""Deserializes the Job `data` into a tuple.
This includes the `_func_name`, `_instance`, `_args` and `_kwargs`
@@ -575,6 +604,8 @@ class Job:
self._result = None
self._exc_info = None
self.timeout: Optional[float] = None
+ self._success_callback_timeout: Optional[int] = None
+ self._failure_callback_timeout: Optional[int] = None
self.result_ttl: Optional[int] = None
self.failure_ttl: Optional[int] = None
self.ttl: Optional[int] = None
@@ -862,9 +893,15 @@ class Job:
if obj.get('success_callback_name'):
self._success_callback_name = obj.get('success_callback_name').decode()
+ if 'success_callback_timeout' in obj:
+ self._success_callback_timeout = int(obj.get('success_callback_timeout'))
+
if obj.get('failure_callback_name'):
self._failure_callback_name = obj.get('failure_callback_name').decode()
+ if 'failure_callback_timeout' in obj:
+ self._failure_callback_timeout = int(obj.get('failure_callback_timeout'))
+
dep_ids = obj.get('dependency_ids')
dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []
@@ -942,6 +979,10 @@ class Job:
obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8'))
if self.timeout is not None:
obj['timeout'] = self.timeout
+ if self._success_callback_timeout is not None:
+ obj['success_callback_timeout'] = self._success_callback_timeout
+ if self._failure_callback_timeout is not None:
+ obj['failure_callback_timeout'] = self._failure_callback_timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
if self.failure_ttl is not None:
@@ -1303,6 +1344,35 @@ class Job:
self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
+ def execute_success_callback(self, death_penalty_class: Type[BaseDeathPenalty], result: Any):
+ """Executes success_callback for a job.
+ with timeout .
+
+ Args:
+ death_penalty_class (Type[BaseDeathPenalty]): The penalty class to use for timeout
+ result (Any): The job's result.
+ """
+ if not self.success_callback:
+ return
+
+ logger.debug('Running success callbacks for %s', self.id)
+ with death_penalty_class(self.success_callback_timeout, JobTimeoutException, job_id=self.id):
+ self.success_callback(self, self.connection, result)
+
+ def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info):
+ """Executes failure_callback with possible timeout
+ """
+ if not self.failure_callback:
+ return
+
+ logger.debug('Running failure callbacks for %s', self.id)
+ try:
+ with death_penalty_class(self.failure_callback_timeout, JobTimeoutException, job_id=self.id):
+ self.failure_callback(self, self.connection, *exc_info)
+ except Exception: # noqa
+ logger.exception(f'Job {self.id}: error while executing failure callback')
+ raise
+
def _handle_success(self, result_ttl: int, pipeline: 'Pipeline'):
"""Saves and cleanup job after successful execution"""
# self.log.debug('Setting job %s status to finished', job.id)
@@ -1317,9 +1387,8 @@ class Job:
# for backward compatibility
if self.supports_redis_streams:
from .results import Result
- Result.create(
- self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline
- )
+
+ Result.create(self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline)
if result_ttl != 0:
finished_job_registry = self.finished_job_registry
@@ -1339,6 +1408,7 @@ class Job:
)
if self.supports_redis_streams:
from .results import Result
+
Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline)
def get_retry_interval(self) -> int:
@@ -1372,7 +1442,7 @@ class Job:
self.set_status(JobStatus.SCHEDULED)
queue.schedule_job(self, scheduled_datetime, pipeline=pipeline)
else:
- queue.enqueue_job(self, pipeline=pipeline)
+ queue._enqueue_job(self, pipeline=pipeline)
def register_dependency(self, pipeline: Optional['Pipeline'] = None):
"""Jobs may have dependencies. Jobs are enqueued only if the jobs they
@@ -1502,3 +1572,16 @@ class Retry:
self.max = max
self.intervals = intervals
+
+
+class Callback:
+ def __init__(self, func: Callable[..., Any], timeout: Optional[Any] = None):
+ if not inspect.isfunction(func) and not inspect.isbuiltin(func):
+ raise ValueError('Callback func must be a function')
+
+ self.func = func
+ self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT
+
+ @property
+ def name(self) -> str:
+ return '{0}.{1}'.format(self.func.__module__, self.func.__qualname__)
diff --git a/rq/local.py b/rq/local.py
index 8e94457..e6b070b 100644
--- a/rq/local.py
+++ b/rq/local.py
@@ -15,7 +15,7 @@ try:
from greenlet import getcurrent as get_ident
except ImportError: # noqa
try:
- from thread import get_ident # noqa
+ from threading import get_ident # noqa
except ImportError: # noqa
try:
from _thread import get_ident # noqa
@@ -296,18 +296,6 @@ class LocalProxy:
return '<%s unbound>' % self.__class__.__name__
return repr(obj)
- def __nonzero__(self):
- try:
- return bool(self._get_current_object())
- except RuntimeError:
- return False
-
- def __unicode__(self):
- try:
- return unicode(self._get_current_object())
- except RuntimeError:
- return repr(self)
-
def __dir__(self):
try:
return dir(self._get_current_object())
@@ -325,12 +313,6 @@ class LocalProxy:
def __delitem__(self, key):
del self._get_current_object()[key]
- def __setslice__(self, i, j, seq):
- self._get_current_object()[i:j] = seq
-
- def __delslice__(self, i, j):
- del self._get_current_object()[i:j]
-
__setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
__delattr__ = lambda x, n: delattr(x._get_current_object(), n)
__str__ = lambda x: str(x._get_current_object())
@@ -340,14 +322,12 @@ class LocalProxy:
__ne__ = lambda x, o: x._get_current_object() != o
__gt__ = lambda x, o: x._get_current_object() > o
__ge__ = lambda x, o: x._get_current_object() >= o
- __cmp__ = lambda x, o: cmp(x._get_current_object(), o)
__hash__ = lambda x: hash(x._get_current_object())
__call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
__len__ = lambda x: len(x._get_current_object())
__getitem__ = lambda x, i: x._get_current_object()[i]
__iter__ = lambda x: iter(x._get_current_object())
__contains__ = lambda x, i: i in x._get_current_object()
- __getslice__ = lambda x, i, j: x._get_current_object()[i:j]
__add__ = lambda x, o: x._get_current_object() + o
__sub__ = lambda x, o: x._get_current_object() - o
__mul__ = lambda x, o: x._get_current_object() * o
@@ -373,6 +353,5 @@ class LocalProxy:
__oct__ = lambda x: oct(x._get_current_object())
__hex__ = lambda x: hex(x._get_current_object())
__index__ = lambda x: x._get_current_object().__index__()
- __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o)
__enter__ = lambda x: x._get_current_object().__enter__()
__exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
diff --git a/rq/queue.py b/rq/queue.py
index 3c483e8..77a6f3e 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -10,6 +10,8 @@ from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Ty
from redis import WatchError
+from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
+
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
@@ -62,13 +64,15 @@ class EnqueueData(
@total_ordering
class Queue:
job_class: Type['Job'] = Job
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty
DEFAULT_TIMEOUT: int = 180 # Default timeout seconds.
redis_queue_namespace_prefix: str = 'rq:queue:'
redis_queues_keys: str = 'rq:queues'
@classmethod
def all(
- cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
+ cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None,
+ serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None
) -> List['Queue']:
"""Returns an iterable of all Queues.
@@ -76,6 +80,7 @@ class Queue:
connection (Optional[Redis], optional): The Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The Job class to use. Defaults to None.
serializer (optional): The serializer to use. Defaults to None.
+ death_penalty_class (Optional[Job], optional): The Death Penalty class to use. Defaults to None.
Returns:
queues (List[Queue]): A list of all queues.
@@ -84,7 +89,8 @@ class Queue:
def to_queue(queue_key):
return cls.from_queue_key(
- as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer
+ as_text(queue_key), connection=connection, job_class=job_class,
+ serializer=serializer, death_penalty_class=death_penalty_class
)
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
@@ -96,8 +102,9 @@ class Queue:
cls,
queue_key: str,
connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None,
+ job_class: Optional[Type['Job']] = None,
serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
@@ -108,6 +115,7 @@ class Queue:
connection (Optional[Redis], optional): Redis connection. Defaults to None.
job_class (Optional[Job], optional): Job class. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
+ death_penalty_class (Optional[BaseDeathPenalty], optional): Death penalty class. Defaults to None.
Raises:
ValueError: If the queue_key doesn't start with the defined prefix
@@ -119,7 +127,8 @@ class Queue:
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
name = queue_key[len(prefix):]
- return cls(name, connection=connection, job_class=job_class, serializer=serializer)
+ return cls(name, connection=connection, job_class=job_class, serializer=serializer,
+ death_penalty_class=death_penalty_class)
def __init__(
self,
@@ -129,6 +138,7 @@ class Queue:
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
+ death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty,
**kwargs,
):
"""Initializes a Queue object.
@@ -141,6 +151,7 @@ class Queue:
If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True.
job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
+ death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty.
"""
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
@@ -159,6 +170,7 @@ class Queue:
if isinstance(job_class, str):
job_class = import_attribute(job_class)
self.job_class = job_class
+ self.death_penalty_class = death_penalty_class
self.serializer = resolve_serializer(serializer)
self.redis_server_version: Optional[Tuple[int, int, int]] = None
@@ -166,9 +178,6 @@ class Queue:
def __len__(self):
return self.count
- def __nonzero__(self):
- return True
-
def __bool__(self):
return True
@@ -336,7 +345,7 @@ class Queue:
else:
end = length
job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)]
- self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
+ self.log.debug('Getting jobs for queue %s: %d found.', green(self.name), len(job_ids))
return job_ids
def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']:
@@ -455,7 +464,7 @@ class Queue:
result = connection.lpush(self.key, job_id)
else:
result = connection.rpush(self.key, job_id)
- self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
+ self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result)
def create_job(
self,
@@ -663,12 +672,7 @@ class Queue:
on_success=on_success,
on_failure=on_failure,
)
-
- job = self.setup_dependencies(job, pipeline=pipeline)
- # If we do not depend on an unfinished job, enqueue the job.
- if job.get_status(refresh=False) != JobStatus.DEFERRED:
- return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
- return job
+ return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
@staticmethod
def prepare_data(
@@ -739,7 +743,7 @@ class Queue:
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [
- self.enqueue_job(
+ self._enqueue_job(
self.create_job(
job_data.func,
args=job_data.args,
@@ -980,7 +984,26 @@ class Queue:
return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
- """Enqueues a job for delayed execution.
+ """Enqueues a job for delayed execution checking dependencies.
+
+ Args:
+ job (Job): The job to enqueue
+ pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None.
+ at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False.
+
+ Returns:
+ Job: The enqued job
+ """
+ job.origin = self.name
+ job = self.setup_dependencies(job, pipeline=pipeline)
+ # If we do not depend on an unfinished job, enqueue the job.
+ if job.get_status(refresh=False) != JobStatus.DEFERRED:
+ return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
+ return job
+
+
+ def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
+ """Enqueues a job for delayed execution without checking dependencies.
If Queue is instantiated with is_async=False, job is executed immediately.
@@ -1067,7 +1090,6 @@ class Queue:
dependents_key = job.dependents_key
while True:
-
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
@@ -1107,10 +1129,10 @@ class Queue:
registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name:
- self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
+ self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
else:
queue = self.__class__(name=dependent.origin, connection=self.connection)
- queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
+ queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
# Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids):
@@ -1140,7 +1162,7 @@ class Queue:
return as_text(self.connection.lpop(self.key))
@classmethod
- def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None):
+ def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@@ -1155,7 +1177,7 @@ class Queue:
Args:
queue_keys (_type_): _description_
- timeout (int): _description_
+ timeout (Optional[int]): _description_
connection (Optional[Redis], optional): _description_. Defaults to None.
Raises:
@@ -1188,10 +1210,11 @@ class Queue:
def dequeue_any(
cls,
queues: List['Queue'],
- timeout: int,
+ timeout: Optional[int],
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1205,10 +1228,11 @@ class Queue:
Args:
queues (List[Queue]): List of queue objects
- timeout (int): Timeout for the LPOP
+ timeout (Optional[int]): Timeout for the LPOP
connection (Optional[Redis], optional): Redis Connection. Defaults to None.
- job_class (Optional[Job], optional): The job classification. Defaults to None.
+ job_class (Optional[Type[Job]], optional): The job class. Defaults to None.
serializer (Any, optional): Serializer to use. Defaults to None.
+ death_penalty_class (Optional[Type[BaseDeathPenalty]], optional): The death penalty class. Defaults to None.
Raises:
e: Any exception
@@ -1224,7 +1248,8 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer)
+ queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class,
+ serializer=serializer, death_penalty_class=death_penalty_class)
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError:
diff --git a/rq/registry.py b/rq/registry.py
index 509bd87..2bd874c 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -1,22 +1,30 @@
import calendar
+import logging
+import traceback
+
from rq.serializers import resolve_serializer
import time
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, List, Optional, Type, Union
+from .timeouts import JobTimeoutException, UnixSignalDeathPenalty, BaseDeathPenalty
+
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .utils import as_text
from .connections import resolve_connection
-from .defaults import DEFAULT_FAILURE_TTL
-from .exceptions import InvalidJobOperation, NoSuchJobError
+from .defaults import DEFAULT_FAILURE_TTL, CALLBACK_TIMEOUT
+from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError
from .job import Job, JobStatus
from .queue import Queue
from .utils import backend_class, current_timestamp
+logger = logging.getLogger("rq.registry")
+
+
class BaseRegistry:
"""
Base implementation of a job registry, implemented in Redis sorted set.
@@ -25,6 +33,7 @@ class BaseRegistry:
"""
job_class = Job
+ death_penalty_class = UnixSignalDeathPenalty
key_template = 'rq:registry:{0}'
def __init__(
@@ -34,6 +43,7 @@ class BaseRegistry:
job_class: Optional[Type['Job']] = None,
queue: Optional['Queue'] = None,
serializer: Any = None,
+ death_penalty_class: Optional[Type[BaseDeathPenalty]] = None,
):
if queue:
self.name = queue.name
@@ -46,6 +56,7 @@ class BaseRegistry:
self.key = self.key_template.format(self.name)
self.job_class = backend_class(self, 'job_class', override=job_class)
+ self.death_penalty_class = backend_class(self, 'death_penalty_class', override=death_penalty_class)
def __len__(self):
"""Returns the number of jobs in this registry"""
@@ -186,7 +197,7 @@ class BaseRegistry:
job.ended_at = None
job._exc_info = ''
job.save()
- job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front)
+ job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front)
pipeline.execute()
return job
@@ -204,7 +215,7 @@ class StartedJobRegistry(BaseRegistry):
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp: Optional[float] = None):
- """Remove expired jobs from registry and add them to FailedJobRegistry.
+ """Remove abandoned jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
@@ -226,6 +237,9 @@ class StartedJobRegistry(BaseRegistry):
except NoSuchJobError:
continue
+ job.execute_failure_callback(self.death_penalty_class, AbandonedJobError, AbandonedJobError(),
+ traceback.extract_stack())
+
retry = job.retries_left and job.retries_left > 0
if retry:
@@ -233,8 +247,11 @@ class StartedJobRegistry(BaseRegistry):
job.retry(queue, pipeline)
else:
+ exc_string = f"due to {AbandonedJobError.__name__}"
+ logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} '
+ f'({exc_string})')
job.set_status(JobStatus.FAILED)
- job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now()
+ job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}"
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl)
diff --git a/rq/results.py b/rq/results.py
index a6dafde..55ee971 100644
--- a/rq/results.py
+++ b/rq/results.py
@@ -16,7 +16,7 @@ def get_key(job_id):
return 'rq:results:%s' % job_id
-class Result(object):
+class Result:
class Type(Enum):
SUCCESSFUL = 1
FAILED = 2
@@ -85,7 +85,7 @@ class Result(object):
# response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-')
results = []
- for (result_id, payload) in response:
+ for result_id, payload in response:
results.append(
cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
)
@@ -169,7 +169,10 @@ class Result(object):
if pipeline is None:
self.id = result.decode()
if ttl is not None:
- connection.expire(key, ttl)
+ if ttl == -1:
+ connection.persist(key)
+ else:
+ connection.expire(key, ttl)
return self.id
def serialize(self):
diff --git a/rq/scheduler.py b/rq/scheduler.py
index 84802b6..67b6431 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -6,6 +6,7 @@ import traceback
from datetime import datetime
from enum import Enum
from multiprocessing import Process
+from typing import List
from redis import SSLConnection, UnixDomainSocketConnection
@@ -35,14 +36,14 @@ class RQScheduler:
Status = SchedulerStatus
def __init__(
- self,
- queues,
- connection,
- interval=1,
- logging_level=logging.INFO,
- date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT,
- serializer=None,
+ self,
+ queues,
+ connection,
+ interval=1,
+ logging_level=logging.INFO,
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ serializer=None,
):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
@@ -110,7 +111,7 @@ class RQScheduler:
"""Returns names of queue it successfully acquires lock on"""
successful_locks = set()
pid = os.getpid()
- self.log.debug("Trying to acquire locks for %s", ", ".join(self._queue_names))
+ self.log.debug('Trying to acquire locks for %s', ', '.join(self._queue_names))
for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60):
successful_locks.add(name)
@@ -166,7 +167,7 @@ class RQScheduler:
jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
for job in jobs:
if job is not None:
- queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
+ queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
registry.remove(job, pipeline=pipeline)
pipeline.execute()
self._status = self.Status.STARTED
@@ -184,7 +185,7 @@ class RQScheduler:
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
- self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
+ self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._acquired_locks:
@@ -196,7 +197,7 @@ class RQScheduler:
self.connection.expire(key, self.interval + 60)
def stop(self):
- self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names))
+ self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names))
self.release_locks()
self._status = self.Status.STOPPED
@@ -232,10 +233,21 @@ class RQScheduler:
def run(scheduler):
- scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid())
+ scheduler.log.info('Scheduler for %s started with PID %s', ', '.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
raise
- scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())
+ scheduler.log.info('Scheduler with PID %d has stopped', os.getpid())
+
+
+def parse_names(queues_or_names) -> List[str]:
+ """Given a list of strings or queues, returns queue names"""
+ names = []
+ for queue_or_name in queues_or_names:
+ if isinstance(queue_or_name, Queue):
+ names.append(queue_or_name.name)
+ else:
+ names.append(str(queue_or_name))
+ return names
diff --git a/rq/timeouts.py b/rq/timeouts.py
index a1401c5..44f01f9 100644
--- a/rq/timeouts.py
+++ b/rq/timeouts.py
@@ -110,10 +110,14 @@ class TimerDeathPenalty(BaseDeathPenalty):
def setup_death_penalty(self):
"""Starts the timer."""
+ if self._timeout <= 0:
+ return
self._timer = self.new_timer()
self._timer.start()
def cancel_death_penalty(self):
"""Cancels the timer."""
+ if self._timeout <= 0:
+ return
self._timer.cancel()
self._timer = None
diff --git a/rq/utils.py b/rq/utils.py
index c8c474d..051d5ee 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -96,7 +96,6 @@ def make_colorizer(color: str):
class ColorizingStreamHandler(logging.StreamHandler):
-
levels = {
logging.WARNING: make_colorizer('darkyellow'),
logging.ERROR: make_colorizer('darkred'),
@@ -182,10 +181,7 @@ def import_attribute(name: str) -> Callable[..., Any]:
E.g.: package_a.package_b.module_a.ClassA.my_static_method
Thus we remove the bits from the end of the name until we can import it
- Sometimes the failure during importing is due to a genuine coding error in the imported module
- In this case, the exception is logged as a warning for ease of debugging.
- The above logic will apply anyways regardless of the cause of the import error.
-
+
Args:
name (str): The name (reference) to the path.
@@ -204,7 +200,6 @@ def import_attribute(name: str) -> Callable[..., Any]:
module = importlib.import_module(module_name)
break
except ImportError:
- logger.warning("Import error for '%s'" % module_name, exc_info=True)
attribute_bits.insert(0, module_name_bits.pop())
if module is None:
@@ -354,7 +349,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]:
return utcparse(date_str.decode())
-def parse_timeout(timeout: Any):
+def parse_timeout(timeout: Union[int, float, str]) -> int:
"""Transfer all kinds of timeout format to an integer representing seconds"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:
try:
diff --git a/rq/version.py b/rq/version.py
index 342b3ec..9fb1be8 100644
--- a/rq/version.py
+++ b/rq/version.py
@@ -1 +1 @@
-VERSION = '1.12.0'
+VERSION = '1.13.0'
diff --git a/rq/worker.py b/rq/worker.py
index a324ad6..80c0384 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -1,5 +1,7 @@
+import contextlib
import errno
import logging
+import math
import os
import random
import signal
@@ -8,14 +10,18 @@ import sys
import time
import traceback
import warnings
-
from datetime import timedelta
from enum import Enum
-from uuid import uuid4
from random import shuffle
-from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union
+from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
+ Union)
+from uuid import uuid4
if TYPE_CHECKING:
+ try:
+ from resource import struct_rusage
+ except ImportError:
+ pass
from redis import Redis
from redis.client import Pipeline
@@ -23,12 +29,13 @@ try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
+
from contextlib import suppress
+
import redis.exceptions
from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
-from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (
@@ -41,18 +48,20 @@ from .defaults import (
DEFAULT_LOGGING_DATE_FORMAT,
)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
+
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler
+from .serializers import resolve_serializer
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
-from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact
+from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
from .version import VERSION
-from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer
+
try:
from setproctitle import setproctitle as setprocname
except ImportError:
@@ -90,6 +99,12 @@ def signal_name(signum):
return 'SIG_UNKNOWN'
+class DequeueStrategy(str, Enum):
+ DEFAULT = "default"
+ ROUND_ROBIN = "round_robin"
+ RANDOM = "random"
+
+
class WorkerStatus(str, Enum):
STARTED = 'started'
SUSPENDED = 'suspended'
@@ -108,9 +123,9 @@ class Worker:
log_result_lifespan = True
# `log_job_description` is used to toggle logging an entire jobs description.
log_job_description = True
- # factor to increase connection_wait_time incase of continous connection failures.
+ # factor to increase connection_wait_time in case of continuous connection failures.
exponential_backoff_factor = 2.0
- # Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable.
+ # Max Wait time (in seconds) after which exponential_backoff_factor won't be applicable.
max_connection_wait_time = 60.0
@classmethod
@@ -132,7 +147,7 @@ class Worker:
elif connection is None:
connection = get_current_connection()
- worker_keys = get_keys(queue=queue, connection=connection)
+ worker_keys = worker_registration.get_keys(queue=queue, connection=connection)
workers = [
cls.find_by_key(
key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
@@ -152,7 +167,7 @@ class Worker:
Returns:
list_keys (List[str]): A list of worker keys
"""
- return [as_text(key) for key in get_keys(queue=queue, connection=connection)]
+ return [as_text(key) for key in worker_registration.get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int:
@@ -165,7 +180,7 @@ class Worker:
Returns:
length (int): The queue length.
"""
- return len(get_keys(queue=queue, connection=connection))
+ return len(worker_registration.get_keys(queue=queue, connection=connection))
@classmethod
def find_by_key(
@@ -226,6 +241,7 @@ class Worker:
exc_handler=None,
exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL,
+ maintenance_interval: int = DEFAULT_MAINTENANCE_TASK_INTERVAL,
job_class: Type['Job'] = None,
queue_class=None,
log_job_description: bool = True,
@@ -233,11 +249,12 @@ class Worker:
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
+ work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None
): # noqa
-
self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval
+ self.maintenance_interval = maintenance_interval
connection = self._set_connection(connection)
self.connection = connection
@@ -250,7 +267,8 @@ class Worker:
self.serializer = resolve_serializer(serializer)
queues = [
- self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer)
+ self.queue_class(name=q, connection=connection, job_class=self.job_class,
+ serializer=self.serializer, death_penalty_class=self.death_penalty_class,)
if isinstance(q, str)
else q
for q in ensure_list(queues)
@@ -261,6 +279,7 @@ class Worker:
self.validate_queues()
self._ordered_queues = self.queues[:]
self._exc_handlers: List[Callable] = []
+ self._work_horse_killed_handler = work_horse_killed_handler
self._state: str = 'starting'
self._is_horse: bool = False
@@ -279,6 +298,7 @@ class Worker:
self.scheduler: Optional[RQScheduler] = None
self.pubsub = None
self.pubsub_thread = None
+ self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
self.disable_default_exception_handler = disable_default_exception_handler
@@ -532,8 +552,7 @@ class Worker:
return self.job_class.fetch(job_id, self.connection, self.serializer)
def _install_signal_handlers(self):
- """Installs signal handlers for handling SIGINT and SIGTERM gracefully.
- """
+ """Installs signal handlers for handling SIGINT and SIGTERM gracefully."""
signal.signal(signal.SIGINT, self.request_stop)
signal.signal(signal.SIGTERM, self.request_stop)
@@ -553,18 +572,14 @@ class Worker:
else:
raise
- def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]:
+ def wait_for_horse(self) -> Tuple[Optional[int], Optional[int], Optional['struct_rusage']]:
"""Waits for the horse process to complete.
Uses `0` as argument as to include "any child in the process group of the current process".
"""
- pid = None
- stat = None
- try:
- pid, stat = os.waitpid(self.horse_pid, 0)
- except ChildProcessError:
- # ChildProcessError: [Errno 10] No child processes
- pass
- return pid, stat
+ pid = stat = rusage = None
+ with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes
+ pid, stat, rusage = os.wait4(self.horse_pid, 0)
+ return pid, stat, rusage
def request_force_stop(self, signum, frame):
"""Terminates the application (cold shutdown).
@@ -621,13 +636,11 @@ class Worker:
self.log.info('Warm shut down requested')
def check_for_suspension(self, burst: bool):
- """Check to see if workers have been suspended by `rq suspend`
- """
+ """Check to see if workers have been suspended by `rq suspend`"""
before_state = None
notified = False
while not self._stop_requested and is_suspended(self.connection, self):
-
if burst:
self.log.info('Suspended in burst mode, exiting')
self.log.info('Note: There could still be unfinished jobs on the queue')
@@ -674,14 +687,90 @@ class Worker:
self.pubsub.unsubscribe()
self.pubsub.close()
- def reorder_queues(self, reference_queue):
- """Method placeholder to workers that implement some reordering strategy.
- `pass` here means that the queue will remain with the same job order.
+ def reorder_queues(self, reference_queue: 'Queue'):
+ """Reorder the queues according to the strategy.
+ As this can be defined both in the `Worker` initialization or in the `work` method,
+ it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute.
Args:
- reference_queue (Union[Queue, str]): The queue
- """
- pass
+ reference_queue (Union[Queue, str]): The queues to reorder
+ """
+ if self._dequeue_strategy is None:
+ self._dequeue_strategy = DequeueStrategy.DEFAULT
+
+ if self._dequeue_strategy not in ("default", "random", "round_robin"):
+ raise ValueError(
+ f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`."
+ )
+ if self._dequeue_strategy == DequeueStrategy.DEFAULT:
+ return
+ if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
+ pos = self._ordered_queues.index(reference_queue)
+ self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
+ return
+ if self._dequeue_strategy == DequeueStrategy.RANDOM:
+ shuffle(self._ordered_queues)
+ return
+
+ def bootstrap(
+ self,
+ logging_level: str = "INFO",
+ date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
+ log_format: str = DEFAULT_LOGGING_FORMAT
+ ):
+ """Bootstraps the worker.
+ Runs the basic tasks that should run when the worker actually starts working.
+ Used so that new workers can focus on the work loop implementation rather
+ than the full bootstraping process.
+
+ Args:
+ logging_level (str, optional): Logging level to use. Defaults to "INFO".
+ date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
+ log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
+ """
+ setup_loghandlers(logging_level, date_format, log_format)
+ self.register_birth()
+ self.log.info('Worker %s: started, version %s', self.key, VERSION)
+ self.subscribe()
+ self.set_state(WorkerStatus.STARTED)
+ qnames = self.queue_names()
+ self.log.info('*** Listening on %s...', green(', '.join(qnames)))
+
+ def _start_scheduler(
+ self,
+ burst: bool = False,
+ logging_level: str = "INFO",
+ date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
+ log_format: str = DEFAULT_LOGGING_FORMAT,
+ ):
+ """Starts the scheduler process.
+ This is specifically designed to be run by the worker when running the `work()` method.
+ Instanciates the RQScheduler and tries to acquire a lock.
+ If the lock is acquired, start scheduler.
+ If worker is on burst mode just enqueues scheduled jobs and quits,
+ otherwise, starts the scheduler in a separate process.
+
+ Args:
+ burst (bool, optional): Whether to work on burst mode. Defaults to False.
+ logging_level (str, optional): Logging level to use. Defaults to "INFO".
+ date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
+ log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
+ """
+ self.scheduler = RQScheduler(
+ self.queues,
+ connection=self.connection,
+ logging_level=logging_level,
+ date_format=date_format,
+ log_format=log_format,
+ serializer=self.serializer,
+ )
+ self.scheduler.acquire_locks()
+ if self.scheduler.acquired_locks:
+ if burst:
+ self.scheduler.enqueue_scheduled_jobs()
+ self.scheduler.release_locks()
+ else:
+ self.scheduler.start()
def work(
self,
@@ -690,13 +779,16 @@ class Worker:
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
+ max_idle_time: Optional[int] = None,
with_scheduler: bool = False,
+ dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
) -> bool:
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
+ If `max_idle_time` is provided, worker will die when it's idle for more than the provided value.
The return value indicates whether any jobs were processed.
@@ -706,39 +798,18 @@ class Worker:
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
+ max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
+ dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT
Returns:
worked (bool): Will return True if any job was processed, False otherwise.
"""
- setup_loghandlers(logging_level, date_format, log_format)
+ self.bootstrap(logging_level, date_format, log_format)
+ self._dequeue_strategy = dequeue_strategy
completed_jobs = 0
- self.register_birth()
- self.log.info("Worker %s: started, version %s", self.key, VERSION)
- self.subscribe()
- self.set_state(WorkerStatus.STARTED)
- qnames = self.queue_names()
- self.log.info('*** Listening on %s...', green(', '.join(qnames)))
-
if with_scheduler:
- self.scheduler = RQScheduler(
- self.queues,
- connection=self.connection,
- logging_level=logging_level,
- date_format=date_format,
- log_format=log_format,
- serializer=self.serializer,
- )
- self.scheduler.acquire_locks()
- # If lock is acquired, start scheduler
- if self.scheduler.acquired_locks:
- # If worker is run on burst mode, enqueue_scheduled_jobs()
- # before working. Otherwise, start scheduler in a separate process
- if burst:
- self.scheduler.enqueue_scheduled_jobs()
- self.scheduler.release_locks()
- else:
- self.scheduler.start()
+ self._start_scheduler(burst, logging_level, date_format, log_format)
self._install_signal_handlers()
try:
@@ -754,25 +825,26 @@ class Worker:
break
timeout = None if burst else self.dequeue_timeout
- result = self.dequeue_job_and_maintain_ttl(timeout)
+ result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time)
if result is None:
if burst:
- self.log.info("Worker %s: done, quitting", self.key)
+ self.log.info('Worker %s: done, quitting', self.key)
+ elif max_idle_time is not None:
+ self.log.info('Worker %s: idle for %d seconds, quitting', self.key, max_idle_time)
break
job, queue = result
- self.reorder_queues(reference_queue=queue)
self.execute_job(job, queue)
self.heartbeat()
completed_jobs += 1
if max_jobs is not None:
if completed_jobs >= max_jobs:
- self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs)
+ self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs)
break
except redis.exceptions.TimeoutError:
- self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...")
+ self.log.error('Worker %s: Redis connection timeout, quitting...', self.key)
break
except StopRequested:
@@ -786,15 +858,16 @@ class Worker:
self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
break
finally:
- if not self.is_horse:
-
- if self.scheduler:
- self.stop_scheduler()
-
- self.register_death()
- self.unsubscribe()
+ self.teardown()
return bool(completed_jobs)
+ def teardown(self):
+ if not self.is_horse:
+ if self.scheduler:
+ self.stop_scheduler()
+ self.register_death()
+ self.unsubscribe()
+
def stop_scheduler(self):
"""Ensure scheduler process is stopped
Will send the kill signal to scheduler process,
@@ -808,7 +881,7 @@ class Worker:
pass
self.scheduler._process.join()
- def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']:
+ def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL.
Returns:
@@ -821,25 +894,31 @@ class Worker:
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0
+ idle_since = utcnow()
+ idle_time_left = max_idle_time
while True:
-
try:
self.heartbeat()
if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()
- self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}")
+ if timeout is not None and idle_time_left is not None:
+ timeout = min(timeout, idle_time_left)
+
+ self.log.debug('Dequeueing jobs on queues %s and timeout %d', green(qnames), timeout)
result = self.queue_class.dequeue_any(
self._ordered_queues,
timeout,
connection=self.connection,
job_class=self.job_class,
serializer=self.serializer,
+ death_penalty_class=self.death_penalty_class,
)
if result is not None:
job, queue = result
- self.log.debug(f"Dequeued job {blue(job.id)} from {green(queue.name)}")
+ self.reorder_queues(reference_queue=queue)
+ self.log.debug('Dequeued job %s from %s', blue(job.id), green(queue.name))
job.redis_server_version = self.get_redis_server_version()
if self.log_job_description:
self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id)
@@ -848,7 +927,11 @@ class Worker:
break
except DequeueTimeout:
- pass
+ if max_idle_time is not None:
+ idle_for = (utcnow() - idle_since).total_seconds()
+ idle_time_left = math.ceil(max_idle_time - idle_for)
+ if idle_time_left <= 0:
+ break
except redis.exceptions.ConnectionError as conn_err:
self.log.error(
'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time
@@ -1028,12 +1111,12 @@ class Worker:
job (Job): _description_
queue (Queue): _description_
"""
- ret_val = None
+ retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
- with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
- retpid, ret_val = self.wait_for_horse()
+ with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
+ retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
@@ -1073,20 +1156,20 @@ class Worker:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
- self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
+ self.handle_job_failure(job, queue=queue, exc_string='Job stopped by user, work-horse terminated.')
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
- self.log.warning(
- ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format(
- ret_val
- )
- )
+ signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ''
+ exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
+ self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string)
+ self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(
- job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val
+ job, queue=queue,
+ exc_string=exc_string
)
def execute_job(self, job: 'Job', queue: 'Queue'):
@@ -1156,7 +1239,7 @@ class Worker:
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
- self.log.debug(f"Preparing for execution of Job ID {job.id}")
+ self.log.debug('Preparing for execution of Job ID %s', job.id)
with self.connection.pipeline() as pipeline:
self.set_current_job_id(job.id, pipeline=pipeline)
self.set_current_job_working_time(0, pipeline=pipeline)
@@ -1167,7 +1250,7 @@ class Worker:
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
- self.log.debug(f"Job preparation finished.")
+ self.log.debug('Job preparation finished.')
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
@@ -1267,7 +1350,7 @@ class Worker:
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
- self.log.debug(f"Saving job {job.id}'s successful execution result")
+ self.log.debug('Saving job %s\'s successful execution result', job.id)
job._handle_success(result_ttl, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
@@ -1280,30 +1363,6 @@ class Worker:
except redis.exceptions.WatchError:
continue
- def execute_success_callback(self, job: 'Job', result: Any):
- """Executes success_callback for a job.
- with timeout .
-
- Args:
- job (Job): The Job
- result (Any): The job's result.
- """
- self.log.debug(f"Running success callbacks for {job.id}")
- job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
- with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
- job.success_callback(job, self.connection, result)
-
- def execute_failure_callback(self, job: 'Job'):
- """Executes failure_callback with timeout
-
- Args:
- job (Job): The Job
- """
- self.log.debug(f"Running failure callbacks for {job.id}")
- job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
- with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
- job.failure_callback(job, self.connection, *sys.exc_info())
-
def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
@@ -1317,7 +1376,7 @@ class Worker:
"""
push_connection(self.connection)
started_job_registry = queue.started_job_registry
- self.log.debug("Started Job Registry set.")
+ self.log.debug('Started Job Registry set.')
try:
self.prepare_job_execution(job)
@@ -1325,9 +1384,9 @@ class Worker:
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
- self.log.debug("Performing Job...")
+ self.log.debug('Performing Job...')
rv = job.perform()
- self.log.debug(f"Finished performing Job ID {job.id}")
+ self.log.debug('Finished performing Job ID %s', job.id)
job.ended_at = utcnow()
@@ -1335,23 +1394,22 @@ class Worker:
# to use the same exc handling when pickling fails
job._result = rv
- if job.success_callback:
- self.execute_success_callback(job, rv)
+ job.heartbeat(utcnow(), job.success_callback_timeout)
+ job.execute_success_callback(self.death_penalty_class, rv)
self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
except: # NOQA
- self.log.debug(f"Job {job.id} raised an exception.")
+ self.log.debug('Job %s raised an exception.', job.id)
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
- if job.failure_callback:
- try:
- self.execute_failure_callback(job)
- except: # noqa
- self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True)
- exc_info = sys.exc_info()
- exc_string = ''.join(traceback.format_exception(*exc_info))
+ try:
+ job.heartbeat(utcnow(), job.failure_callback_timeout)
+ job.execute_failure_callback(self.death_penalty_class, *exc_info)
+ except: # noqa
+ exc_info = sys.exc_info()
+ exc_string = ''.join(traceback.format_exception(*exc_info))
self.handle_job_failure(
job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry
@@ -1364,8 +1422,7 @@ class Worker:
self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
if rv is not None:
- log_result = "{0!r}".format(as_text(str(rv)))
- self.log.debug('Result: %s', yellow(log_result))
+ self.log.debug('Result: %r', yellow(as_text(str(rv))))
if self.log_result_lifespan:
result_ttl = job.get_result_ttl(self.default_result_ttl)
@@ -1384,7 +1441,7 @@ class Worker:
the other properties are accessed, which will stop exceptions from
being properly logged, so we guard against it here.
"""
- self.log.debug(f"Handling exception for {job.id}.")
+ self.log.debug('Handling exception for %s.', job.id)
exc_string = ''.join(traceback.format_exception(*exc_info))
try:
extra = {
@@ -1401,7 +1458,9 @@ class Worker:
extra.update({'queue': job.origin, 'job_id': job.id})
# func_name
- self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra)
+ self.log.error(
+ '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra
+ )
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@@ -1423,6 +1482,12 @@ class Worker:
"""Pops the latest exception handler off of the exc handler stack."""
return self._exc_handlers.pop()
+ def handle_work_horse_killed(self, job, retpid, ret_val, rusage):
+ if self._work_horse_killed_handler is None:
+ return
+
+ self._work_horse_killed_handler(job, retpid, ret_val, rusage)
+
def __eq__(self, other):
"""Equality does not take the database/connection into account"""
if not isinstance(other, self.__class__):
@@ -1441,7 +1506,7 @@ class Worker:
if queue.acquire_cleaning_lock():
self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue)
- clean_worker_registry(queue)
+ worker_registration.clean_worker_registry(queue)
self.last_cleaned_at = utcnow()
@property
@@ -1449,7 +1514,7 @@ class Worker:
"""Maintenance tasks should run on first startup or every 10 minutes."""
if self.last_cleaned_at is None:
return True
- if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL):
+ if (utcnow() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval):
return True
return False
@@ -1490,6 +1555,7 @@ class HerokuWorker(Worker):
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later
"""
+
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
@@ -1532,7 +1598,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index 0f31f1d..fe4dc04 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -86,7 +86,6 @@ def clean_worker_registry(queue: 'Queue'):
keys = list(get_keys(queue))
with queue.connection.pipeline() as pipeline:
-
for key in keys:
pipeline.exists(key)
results = pipeline.execute()
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 07b9c39..daa118b 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -5,6 +5,7 @@ from uuid import uuid4
import os
import json
+from click import BadParameter
from click.testing import CliRunner
from redis import Redis
@@ -14,6 +15,7 @@ from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, pars
from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.serializers import JSONSerializer
+from rq.timeouts import UnixSignalDeathPenalty
from rq.worker import Worker, WorkerStatus
from rq.scheduler import RQScheduler
@@ -118,6 +120,23 @@ class TestRQCli(RQTestCase):
'testhost.example.com',
)
+ def test_death_penalty_class(self):
+ cli_config = CliConfig()
+
+ self.assertEqual(
+ UnixSignalDeathPenalty,
+ cli_config.death_penalty_class
+ )
+
+ cli_config = CliConfig(death_penalty_class='rq.job.Job')
+ self.assertEqual(
+ Job,
+ cli_config.death_penalty_class
+ )
+
+ with self.assertRaises(BadParameter):
+ CliConfig(death_penalty_class='rq.abcd')
+
def test_empty_nothing(self):
"""rq empty -u <url>"""
runner = CliRunner()
@@ -326,6 +345,21 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)
+ def test_worker_dequeue_strategy(self):
+ """--quiet and --verbose logging options are supported"""
+ runner = CliRunner()
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong']
+ result = runner.invoke(main, args)
+ self.assertEqual(result.exit_code, 1)
+
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
connection = Redis.from_url(self.redis_url)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index fdfafbd..393c20d 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -1,9 +1,7 @@
from redis import Redis
-from rq import Connection, Queue, use_connection, get_current_connection, pop_connection
-from rq.connections import NoRedisConnectionException
-
-from tests import find_empty_redis_database, RQTestCase
+from rq import Connection, Queue
+from tests import RQTestCase, find_empty_redis_database
from tests.fixtures import do_nothing
@@ -37,30 +35,3 @@ class TestConnectionInheritance(RQTestCase):
job2 = q2.enqueue(do_nothing)
self.assertEqual(q1.connection, job1.connection)
self.assertEqual(q2.connection, job2.connection)
-
-
-class TestConnectionHelpers(RQTestCase):
- def test_use_connection(self):
- """Test function use_connection works as expected."""
- conn = new_connection()
- use_connection(conn)
-
- self.assertEqual(conn, get_current_connection())
-
- use_connection()
-
- self.assertNotEqual(conn, get_current_connection())
-
- use_connection(self.testconn) # Restore RQTestCase connection
-
- with self.assertRaises(AssertionError):
- with Connection(new_connection()):
- use_connection()
- with Connection(new_connection()):
- use_connection()
-
- def test_resolve_connection_raises_on_no_connection(self):
- """Test function resolve_connection raises if there is no connection."""
- pop_connection()
- with self.assertRaises(NoRedisConnectionException):
- Queue()
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py
index 26b115d..a290a87 100644
--- a/tests/test_dependencies.py
+++ b/tests/test_dependencies.py
@@ -117,6 +117,37 @@ class TestDependencies(RQTestCase):
self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
+ def test_dependency_list_in_depends_on(self):
+ """Enqueue with Dependency list in depends_on"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job1 = q.enqueue(say_hello)
+ parent_job2 = q.enqueue(say_hello)
+ job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])])
+ w.work(burst=True)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+
+ def test_enqueue_job_dependency(self):
+ """Enqueue via Queue.enqueue_job() with depencency"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job = Job.create(say_hello)
+ parent_job.save()
+ job = Job.create(say_hello, depends_on=parent_job)
+ q.enqueue_job(job)
+ w.work(burst=True)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+ q.enqueue_job(parent_job)
+ w.work(burst=True)
+ self.assertEqual(parent_job.get_status(), JobStatus.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+
def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn)
diff --git a/tests/test_helpers.py b/tests/test_helpers.py
index b43f13b..5a84f71 100644
--- a/tests/test_helpers.py
+++ b/tests/test_helpers.py
@@ -1,11 +1,12 @@
from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
-
+from unittest import mock
class TestHelpers(RQTestCase):
- def test_get_redis_from_config(self):
+ @mock.patch('rq.cli.helpers.Sentinel')
+ def test_get_redis_from_config(self, sentinel_class_mock):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
@@ -39,3 +40,46 @@ class TestHelpers(RQTestCase):
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')
+
+ # Add Sentinel to the settings
+ settings.update({
+ 'SENTINEL': {
+ 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
+ 'MASTER_NAME': 'master',
+ 'DB': 2,
+ 'USERNAME': 'redis-user',
+ 'PASSWORD': 'redis-secret',
+ 'SOCKET_TIMEOUT': None,
+ 'CONNECTION_KWARGS': {
+ 'ssl_ca_path': None,
+ },
+ 'SENTINEL_KWARGS': {
+ 'username': 'sentinel-user',
+ 'password': 'sentinel-secret',
+ },
+ },
+ })
+
+ # Ensure SENTINEL is preferred against REDIS_* parameters
+ redis = get_redis_from_config(settings)
+ sentinel_init_sentinels_args = sentinel_class_mock.call_args[0]
+ sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1]
+ self.assertEqual(
+ sentinel_init_sentinels_args,
+ ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],)
+ )
+ self.assertDictEqual(
+ sentinel_init_sentinel_kwargs,
+ {
+ 'db': 2,
+ 'ssl': False,
+ 'username': 'redis-user',
+ 'password': 'redis-secret',
+ 'socket_timeout': None,
+ 'ssl_ca_path': None,
+ 'sentinel_kwargs': {
+ 'username': 'sentinel-user',
+ 'password': 'sentinel-secret',
+ }
+ }
+ )
diff --git a/tests/test_job.py b/tests/test_job.py
index 9d1ceae..23bbd11 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -1,4 +1,6 @@
import json
+
+from rq.defaults import CALLBACK_TIMEOUT
from rq.serializers import JSONSerializer
import time
import queue
@@ -9,7 +11,7 @@ from redis import WatchError
from rq.utils import as_text
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
-from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job
+from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback
from rq.queue import Queue
from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
@@ -209,9 +211,9 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
- set(self.testconn.hkeys(job.key)),
{b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at',
- b'worker_name', b'success_callback_name', b'failure_callback_name'}
+ b'worker_name', b'success_callback_name', b'failure_callback_name'},
+ set(self.testconn.hkeys(job.key))
)
self.assertEqual(job.last_heartbeat, None)
@@ -241,6 +243,31 @@ class TestJob(RQTestCase):
self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
+ def test_persistence_of_callbacks(self):
+ """Storing jobs with success and/or failure callbacks."""
+ job = Job.create(func=fixtures.some_calculation,
+ on_success=Callback(fixtures.say_hello, timeout=10),
+ on_failure=fixtures.say_pid) # deprecated callable
+ job.save()
+ stored_job = Job.fetch(job.id)
+
+ self.assertEqual(fixtures.say_hello, stored_job.success_callback)
+ self.assertEqual(10, stored_job.success_callback_timeout)
+ self.assertEqual(fixtures.say_pid, stored_job.failure_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
+
+ # None(s)
+ job = Job.create(func=fixtures.some_calculation,
+ on_failure=None)
+ job.save()
+ stored_job = Job.fetch(job.id)
+ self.assertIsNone(stored_job.success_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, job.success_callback_timeout) # timeout should be never none
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.success_callback_timeout)
+ self.assertIsNone(stored_job.failure_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, job.failure_callback_timeout) # timeout should be never none
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
+
def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4),
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 28a29ca..57584b5 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -1,9 +1,12 @@
from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import PropertyMock, ANY
+
from rq.serializers import JSONSerializer
from rq.utils import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
-from rq.exceptions import InvalidJobOperation
+from rq.exceptions import InvalidJobOperation, AbandonedJobError
from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue
from rq.utils import current_timestamp
@@ -161,7 +164,9 @@ class TestRegistry(RQTestCase):
self.assertNotIn(job, failed_job_registry)
self.assertIn(job, self.registry)
- self.registry.cleanup()
+ with mock.patch.object(Job, 'execute_failure_callback') as mocked:
+ self.registry.cleanup()
+ mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY)
self.assertIn(job.id, failed_job_registry)
self.assertNotIn(job, self.registry)
job.refresh()
diff --git a/tests/test_results.py b/tests/test_results.py
index 4f705f5..9bc1b9e 100644
--- a/tests/test_results.py
+++ b/tests/test_results.py
@@ -214,3 +214,25 @@ class TestScheduledJobRegistry(RQTestCase):
job = queue.enqueue(div_by_zero)
self.assertEqual(job.latest_result().type, Result.Type.FAILED)
+
+ def test_job_return_value_result_ttl_infinity(self):
+ """Test job.return_value when queue.result_ttl=-1"""
+ queue = Queue(connection=self.connection, result_ttl=-1)
+ job = queue.enqueue(say_hello)
+
+ # Returns None when there's no result
+ self.assertIsNone(job.return_value())
+
+ Result.create(job, Result.Type.SUCCESSFUL, ttl=-1, return_value=1)
+ self.assertEqual(job.return_value(), 1)
+
+ def test_job_return_value_result_ttl_zero(self):
+ """Test job.return_value when queue.result_ttl=0"""
+ queue = Queue(connection=self.connection, result_ttl=0)
+ job = queue.enqueue(say_hello)
+
+ # Returns None when there's no result
+ self.assertIsNone(job.return_value())
+
+ Result.create(job, Result.Type.SUCCESSFUL, ttl=0, return_value=1)
+ self.assertIsNone(job.return_value())
diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py
index 2872ee0..1f392a3 100644
--- a/tests/test_timeouts.py
+++ b/tests/test_timeouts.py
@@ -42,3 +42,10 @@ class TestTimeouts(RQTestCase):
self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
+
+ # Test negative timeout doesn't raise JobTimeoutException,
+ # which implies an unintended immediate timeout.
+ job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=-1)
+ w.work(burst=True)
+ job.refresh()
+ self.assertIn(job, finished_job_registry)
diff --git a/tests/test_worker.py b/tests/test_worker.py
index cfce473..dfa0f1d 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -19,6 +19,7 @@ import pytest
from unittest import mock
from unittest.mock import Mock
+from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, slow
from tests.fixtures import (
access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
@@ -607,6 +608,31 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
+ @slow
+ def test_max_idle_time(self):
+ q = Queue()
+ w = Worker([q])
+ q.enqueue(say_hello, args=('Frank',))
+ self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1))
+
+ # idle for 1 second
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1))
+
+ # idle for 3 seconds
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
+ # idle for 2 seconds because idle_time is less than timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
+ self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer
+
+ # idle for 3 seconds because idle_time is less than two rounds of timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
@@ -639,7 +665,6 @@ class TestWorker(RQTestCase):
q = Queue()
w = Worker([q])
- # Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self):
@@ -936,7 +961,15 @@ class TestWorker(RQTestCase):
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
- worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+
+ # custom maintenance_interval
+ worker = Worker(queue, maintenance_interval=10)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow()
+ self.assertFalse(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=11)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
@@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase):
worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version)
+ def test_dequeue_random_strategy(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="random")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
+ expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
+
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ expected_rr.reverse()
+ expected_ser.reverse()
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ sorted_ids.sort()
+ expected_ser.sort()
+ self.assertEqual(sorted_ids, expected_ser)
+
+ def test_dequeue_round_robin(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="round_robin")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
+ 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
+ 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
+
+ self.assertEqual(expected, sorted_ids)
+
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
@@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
-
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
- w.monitor_work_horse(job, queue)
+ with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked:
+ w.monitor_work_horse(job, queue)
+ self.assertEqual(mocked.call_count, 1)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor