diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
commit | a4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch) | |
tree | ef4c0e7c1c62f485f00f7b85277beecaf515c120 | |
parent | 63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff) | |
parent | 04722339d7598ff0c52f11c3680ed2dd922e6768 (diff) | |
download | rq-watcher.tar.gz |
Merge branch 'master' of github.com:rq/rq into watcherwatcher
36 files changed, 933 insertions, 390 deletions
diff --git a/.github/workflows/dependencies.yml b/.github/workflows/dependencies.yml index b2b5915..0ce9588 100644 --- a/.github/workflows/dependencies.yml +++ b/.github/workflows/dependencies.yml @@ -26,7 +26,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Start Redis - uses: supercharge/redis-github-action@1.4.0 + uses: supercharge/redis-github-action@1.5.0 with: redis-version: ${{ matrix.redis-version }} @@ -61,7 +61,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Start Redis - uses: supercharge/redis-github-action@1.4.0 + uses: supercharge/redis-github-action@1.5.0 with: redis-version: ${{ matrix.redis-version }} diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index d089f80..586529d 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -28,7 +28,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Start Redis - uses: supercharge/redis-github-action@1.4.0 + uses: supercharge/redis-github-action@1.5.0 with: redis-version: ${{ matrix.redis-version }} @@ -47,3 +47,4 @@ jobs: uses: codecov/codecov-action@v3 with: file: ./coverage.xml + fail_ci_if_error: false @@ -1,3 +1,15 @@ +### RQ 1.13.0 (2023-02-19) +* Added `work_horse_killed_handler` argument to `Worker`. Thanks @ronlut! +* Fixed an issue where results aren't properly persisted on synchronous jobs. Thanks @selwin! +* Fixed a bug where job results are not properly persisted when `result_ttl` is `-1`. Thanks @sim6! +* Various documentation and logging fixes. Thanks @lowercase00! +* Improve Redis connection reliability. Thanks @lowercase00! +* Scheduler reliability improvements. Thanks @OlegZv and @lowercase00! +* Fixed a bug where `dequeue_timeout` ignores `worker_ttl`. Thanks @ronlut! +* Use `job.return_value()` instead of `job.result` when processing callbacks. Thanks @selwin! +* Various internal refactorings to make `Worker` code more easily extendable. Thanks @lowercase00! +* RQ's source code is now black formatted. Thanks @aparcar! + ### RQ 1.12.0 (2023-01-15) * RQ now stores multiple job execution results. This feature is only available on Redis >= 5.0 Redis Streams. Please refer to [the docs](https://python-rq.org/docs/results/) for more info. Thanks @selwin! * Improve performance when enqueueing many jobs at once. Thanks @rggjan! diff --git a/docs/docs/connections.md b/docs/docs/connections.md index 8e563d9..4382cef 100644 --- a/docs/docs/connections.md +++ b/docs/docs/connections.md @@ -3,61 +3,20 @@ title: "RQ: Connections" layout: docs --- -Although RQ features the `use_connection()` command for convenience, it -is deprecated, since it pollutes the global namespace. Instead, prefer explicit -connection management using the `with Connection(...):` context manager, or -pass in Redis connection references to queues directly. +### The connection parameter - -## Single Redis connection (easy) - -<div class="warning"> - <img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" /> - <strong>Note:</strong> - <p> - The use of <code>use_connection</code> is deprecated. - Please don't use <code>use_connection</code> in your scripts. - Instead, use explicit connection management. - </p> -</div> - -In development mode, to connect to a default, local Redis server: - -```python -from rq import use_connection -use_connection() -``` - -In production, to connect to a specific Redis server: +Each RQ object (queues, workers, jobs) has a `connection` keyword +argument that can be passed to the constructor - this is the recommended way of handling connections. ```python from redis import Redis -from rq import use_connection +from rq import Queue -redis = Redis('my.host.org', 6789, password='secret') -use_connection(redis) +redis = Redis('localhost', 6789) +q = Queue(connection=redis) ``` -Be aware of the fact that `use_connection` pollutes the global namespace. It -also implies that you can only ever use a single connection. - - -## Multiple Redis connections - -However, the single connection pattern facilitates only those cases where you -connect to a single Redis instance, and where you affect global context (by -replacing the existing connection with the `use_connection()` call). You can -only use this pattern when you are in full control of your web stack. - -In any other situation, or when you want to use multiple connections, you -should use `Connection` contexts or pass connections around explicitly. - - -### Explicit connections (precise, but tedious) - -Each RQ object instance (queues, workers, jobs) has a `connection` keyword -argument that can be passed to the constructor. Using this, you don't need to -use `use_connection()`. Instead, you can create your queues like this: +This pattern allows for different connections to be passed to different objects: ```python from rq import Queue @@ -73,11 +32,19 @@ q2 = Queue('bar', connection=conn2) Every job that is enqueued on a queue will know what connection it belongs to. The same goes for the workers. -This approach is very precise, but rather verbose, and therefore, tedious. - ### Connection contexts (precise and concise) +<div class="warning"> + <img style="float: right; margin-right: -60px; margin-top: -38px" src="/img/warning.png" /> + <strong>Note:</strong> + <p> + The use of <code>Connection</code> context manager is deprecated. + Please don't use <code>Connection</code> in your scripts. + Instead, use explicit connection management. + </p> +</div> + There is a better approach if you want to use multiple connections, though. Each RQ object instance, upon creation, will use the topmost Redis connection on the RQ connection stack, which is a mechanism to temporarily replace the @@ -136,11 +103,21 @@ Using this setting in conjunction with the systemd or docker containers with the automatic restart option allows workers and RQ to have a fault-tolerant connection to the redis. ```python -SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], - 'SOCKET_TIMEOUT': None, - 'PASSWORD': 'secret', - 'DB': 2, - 'MASTER_NAME': 'master'} +SENTINEL: { + 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], + 'MASTER_NAME': 'master', + 'DB': 2, + 'USERNAME': 'redis-user', + 'PASSWORD': 'redis-secret', + 'SOCKET_TIMEOUT': None, + 'CONNECTION_KWARGS': { # Eventual addition Redis connection arguments + 'ssl_ca_path': None, + }, + 'SENTINEL_KWARGS': { # Eventual Sentinels connections arguments + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + }, +} ``` @@ -162,3 +139,17 @@ q = Queue(connection=conn) Setting a `socket_timeout` with a lower value than the `default_worker_ttl` will cause a `TimeoutError` since it will interrupt the worker while it gets new jobs from the queue. + + +### Encoding / Decoding + +The encoding and decoding of Redis objects occur in multiple locations within the codebase, +which means that the `decode_responses=True` argument of the Redis connection is not currently supported. + +```python +from redis import Redis +from rq import Queue + +conn = Redis(..., decode_responses=True) # This is not supported +q = Queue(connection=conn) +``` diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md index f757f2c..2ab8473 100644 --- a/docs/docs/exceptions.md +++ b/docs/docs/exceptions.md @@ -119,3 +119,36 @@ use a custom exception handler that doesn't fall through, for example: def black_hole(job, *exc_info): return False ``` + +## Work Horse Killed Handler +_New in version 1.13.0._ + +In addition to job exception handler(s), RQ supports registering a handler for unexpected workhorse termination. +This handler is called when a workhorse is unexpectedly terminated, for example due to OOM. + +This is how you set a workhorse termination handler to an RQ worker: + +```python +from my_handlers import my_work_horse_killed_handler + +w = Worker([q], work_horse_killed_handler=my_work_horse_killed_handler) +``` + +The handler itself is a function that takes the following parameters: `job`, +`retpid`, `ret_val` and `rusage`: + +```python +from resource import struct_rusage +from rq.job import Job +def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: struct_rusage): + # do your thing here, for example set job.retries_left to 0 + +``` + +## Built-in Exceptions +RQ Exceptions you can get in your job failure callbacks + +# AbandonedJobError +This error means an unfinished job was collected by another worker's maintenance task. +This usually happens when a worker is busy with a job and is terminated before it finished that job. +Another worker collects this job and moves it to the FailedJobRegistry.
\ No newline at end of file diff --git a/docs/docs/index.md b/docs/docs/index.md index bbf7841..1c24c6b 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -202,6 +202,21 @@ If you want to execute a function whenever a job completes or fails, RQ provides queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure) ``` +### Callback Class and Callback Timeouts + +_New in version 1.14.0_ + +RQ lets you configure the method and timeout for each callback - success and failure. +To configure callback timeouts, use RQ's +`Callback` object that accepts `func` and `timeout` arguments. For example: + +```python +from rq import Callback +queue.enqueue(say_hello, + on_success=Callback(report_success), # default callback timeout (60 seconds) + on_failure=Callback(report_failure, timeout=10)) # 10 seconds timeout +``` + ### Success Callback Success callbacks must be a function that accepts `job`, `connection` and `result` arguments. diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md index 03aa1fa..cb297fa 100644 --- a/docs/docs/scheduling.md +++ b/docs/docs/scheduling.md @@ -95,8 +95,8 @@ from rq import Worker, Queue from redis import Redis redis = Redis() - queue = Queue(connection=redis) + worker = Worker(queues=[queue], connection=redis) worker.work(with_scheduler=True) ``` @@ -113,5 +113,26 @@ working. This way, if a worker with active scheduler dies, the scheduling work w up by other workers with the scheduling component enabled. +## Safe importing of the worker module + +When running the worker programmatically with the scheduler, you must keep in mind that the +import must be protected with `if __name__ == '__main__'`. The scheduler runs on it's own process +(using `multiprocessing` from the stdlib), so the new spawned process must able to safely import the module without +causing any side effects (starting a new process on top of the main ones). + +```python +... + +# When running `with_scheduler=True` this is necessary +if __name__ == '__main__': + worker = Worker(queues=[queue], connection=redis) + worker.work(with_scheduler=True) + +... +# When running without the scheduler this is fine +worker = Worker(queues=[queue], connection=redis) +worker.work() +``` +More information on the Python official docs [here](https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods). diff --git a/docs/docs/workers.md b/docs/docs/workers.md index d19451f..52b4ff3 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--disable-job-desc-logging`: Turn off job description logging. * `--max-jobs`: Maximum number of jobs to execute. +* `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`) _New in version 1.8.0._ * `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer") @@ -317,19 +318,21 @@ $ rq worker -w 'path.to.GeventWorker' The default worker considers the order of queues as their priority order, and if a task is pending in a higher priority queue -it will be selected before any other in queues with lower priority. +it will be selected before any other in queues with lower priority (the `default` behavior). +To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option. In certain circumstances it can be useful that a when a worker is listening to multiple queues, say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th -from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker` -implements this strategy. +from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument. -In some other circumstances, when a worker is listening to multiple queues, it can be useful -to pull jobs from the different queues randomly. The custom class `rq.worker.RandomWorker` -implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is +In other circumstances, it can be useful to pull jobs from the different queues randomly. +To implement this strategy use `-ds random` argument. +In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is shuffled, so that no queue has more priority than the other ones. +Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker` +and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions. ## Custom Job and Queue Classes diff --git a/docs/patterns/index.md b/docs/patterns/index.md index 23f67a4..818dc14 100644 --- a/docs/patterns/index.md +++ b/docs/patterns/index.md @@ -12,24 +12,24 @@ To setup RQ on [Heroku][1], first add it to your rq>=0.13 Create a file called `run-worker.py` with the following content (assuming you -are using [Redis To Go][2] with Heroku): +are using [Heroku Data For Redis][2] with Heroku): ```python import os -import urlparse +import redis from redis import Redis from rq import Queue, Connection from rq.worker import HerokuWorker as Worker + listen = ['high', 'default', 'low'] -redis_url = os.getenv('REDISTOGO_URL') +redis_url = os.getenv('REDIS_URL') if not redis_url: - raise RuntimeError('Set up Redis To Go first.') - -urlparse.uses_netloc.append('redis') -url = urlparse.urlparse(redis_url) -conn = Redis(host=url.hostname, port=url.port, db=0, password=url.password) + raise RuntimeError("Set up Heroku Data For Redis first, \ + make sure its config var is named 'REDIS_URL'.") + +conn = redis.from_url(redis_url) if __name__ == '__main__': with Connection(conn): @@ -47,7 +47,7 @@ Now, all you have to do is spin up a worker: $ heroku scale worker=1 ``` -If you are using [Heroku Redis][5]) you might need to change the Redis connection as follows: +If the from_url function fails to parse your credentials, you might need to do so manually: ```console conn = redis.Redis( @@ -58,6 +58,7 @@ conn = redis.Redis( ssl_cert_reqs=None ) ``` +The details are from the 'settings' page of your Redis add-on on the Heroku dashboard. and for using the cli: @@ -90,7 +91,7 @@ force stdin, stdout and stderr to be totally unbuffered): worker: python -u run-worker.py [1]: https://heroku.com -[2]: https://devcenter.heroku.com/articles/redistogo +[2]: https://devcenter.heroku.com/articles/heroku-redis [3]: https://github.com/ddollar/foreman [4]: https://github.com/ddollar/foreman/wiki/Missing-Output [5]: https://elements.heroku.com/addons/heroku-redis diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4787b13 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 120 +target-version = ['py36'] +skip-string-normalization = true
\ No newline at end of file diff --git a/rq/__init__.py b/rq/__init__.py index d5db681..0ab7065 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,7 +1,7 @@ # flake8: noqa -from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection -from .job import cancel_job, get_current_job, requeue_job, Retry +from .connections import Connection, get_current_connection, pop_connection, push_connection +from .job import cancel_job, get_current_job, requeue_job, Retry, Callback from .queue import Queue from .version import VERSION from .worker import SimpleWorker, Worker diff --git a/rq/cli/cli.py b/rq/cli/cli.py index f781010..27058e8 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,6 +5,7 @@ RQ command line tool from functools import update_wrapper import os import sys +import warnings import click from redis.exceptions import ConnectionError @@ -32,7 +33,7 @@ from rq.defaults import ( DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, - DEFAULT_SERIALIZER_CLASS, + DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL, ) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries @@ -104,7 +105,8 @@ def empty(cli_config, all, queues, serializer, **options): if all: queues = cli_config.queue_class.all( - connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + connection=cli_config.connection, job_class=cli_config.job_class, + death_penalty_class=cli_config.death_penalty_class, serializer=serializer ) else: queues = [ @@ -174,7 +176,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, try: with Connection(cli_config.connection): - if queues: qs = list(map(cli_config.queue_class, queues)) else: @@ -200,7 +201,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs') @click.option('--name', '-n', help='Specify a different name') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') -@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') +@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used') +@click.option( + '--maintenance-interval', + type=int, + default=DEFAULT_MAINTENANCE_TASK_INTERVAL, + help='Maintenance task interval (in seconds) to be used' +) @click.option( '--job-monitoring-interval', type=int, @@ -217,8 +224,10 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') +@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') +@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues') @click.argument('queues', nargs=-1) @pass_cli_config def worker( @@ -228,6 +237,7 @@ def worker( name, results_ttl, worker_ttl, + maintenance_interval, job_monitoring_interval, disable_job_desc_logging, verbose, @@ -239,12 +249,14 @@ def worker( pid, disable_default_exception_handler, max_jobs, + max_idle_time, with_scheduler, queues, log_format, date_format, serializer, - **options + dequeue_strategy, + **options, ): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -259,6 +271,17 @@ def worker( with open(os.path.expanduser(pid), "w") as fp: fp.write(str(os.getpid())) + worker_name = cli_config.worker_class.__qualname__ + if worker_name in ["RoundRobinWorker", "RandomWorker"]: + strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" + msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy." + warnings.warn(msg, DeprecationWarning) + click.secho(msg, fg='yellow') + + if dequeue_strategy not in ("default", "random", "round_robin"): + click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red') + sys.exit(1) + setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: @@ -283,13 +306,14 @@ def worker( connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + maintenance_interval=maintenance_interval, job_monitoring_interval=job_monitoring_interval, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer, + serializer=serializer ) # Should we configure Sentry? @@ -309,7 +333,9 @@ def worker( date_format=date_format, log_format=log_format, max_jobs=max_jobs, + max_idle_time=max_idle_time, with_scheduler=with_scheduler, + dequeue_strategy=dequeue_strategy ) except ConnectionError as e: print(e) @@ -391,7 +417,7 @@ def enqueue( serializer, function, arguments, - **options + **options, ): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index fb20109..d7238ed 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -13,7 +13,8 @@ from shutil import get_terminal_size import click from redis import Redis from redis.sentinel import Sentinel -from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS +from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, \ + DEFAULT_DEATH_PENALTY_CLASS from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout from rq.worker import WorkerStatus @@ -33,21 +34,27 @@ def get_redis_from_config(settings, connection_class=Redis): """Returns a StrictRedis instance from a dictionary of settings. To use redis sentinel, you must specify a dictionary in the configuration file. Example of a dictionary with keys without values: - SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':} + SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'USERNAME':, 'PASSWORD':, 'DB':, 'MASTER_NAME':, 'SENTINEL_KWARGS':} """ if settings.get('REDIS_URL') is not None: return connection_class.from_url(settings['REDIS_URL']) elif settings.get('SENTINEL') is not None: instances = settings['SENTINEL'].get('INSTANCES', [('localhost', 26379)]) - socket_timeout = settings['SENTINEL'].get('SOCKET_TIMEOUT', None) - password = settings['SENTINEL'].get('PASSWORD', None) - db = settings['SENTINEL'].get('DB', 0) master_name = settings['SENTINEL'].get('MASTER_NAME', 'mymaster') - ssl = settings['SENTINEL'].get('SSL', False) - arguments = {'password': password, 'ssl': ssl} + + connection_kwargs = { + 'db': settings['SENTINEL'].get('DB', 0), + 'username': settings['SENTINEL'].get('USERNAME', None), + 'password': settings['SENTINEL'].get('PASSWORD', None), + 'socket_timeout': settings['SENTINEL'].get('SOCKET_TIMEOUT', None), + 'ssl': settings['SENTINEL'].get('SSL', False), + } + connection_kwargs.update(settings['SENTINEL'].get('CONNECTION_KWARGS', {})) + sentinel_kwargs = settings['SENTINEL'].get('SENTINEL_KWARGS', {}) + sn = Sentinel( - instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments + instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs ) return sn.master_for(master_name) @@ -100,7 +107,6 @@ def state_symbol(state): def show_queues(queues, raw, by_queue, queue_class, worker_class): - num_jobs = 0 termwidth = get_terminal_size().columns chartwidth = min(20, termwidth - 20) @@ -141,7 +147,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): workers.add(worker) if not by_queue: - for worker in workers: queue_names = ', '.join(worker.queue_names()) name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid) @@ -298,6 +303,7 @@ class CliConfig: config=None, worker_class=DEFAULT_WORKER_CLASS, job_class=DEFAULT_JOB_CLASS, + death_penalty_class=DEFAULT_DEATH_PENALTY_CLASS, queue_class=DEFAULT_QUEUE_CLASS, connection_class=DEFAULT_CONNECTION_CLASS, path=None, @@ -322,6 +328,11 @@ class CliConfig: raise click.BadParameter(str(exc), param_hint='--job-class') try: + self.death_penalty_class = import_attribute(death_penalty_class) + except (ImportError, AttributeError) as exc: + raise click.BadParameter(str(exc), param_hint='--death-penalty-class') + + try: self.queue_class = import_attribute(queue_class) except (ImportError, AttributeError) as exc: raise click.BadParameter(str(exc), param_hint='--queue-class') diff --git a/rq/connections.py b/rq/connections.py index 413ee5a..0d39e43 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,9 +1,10 @@ +import warnings from contextlib import contextmanager from typing import Optional -import warnings + from redis import Redis -from .local import LocalStack, release_local +from .local import LocalStack class NoRedisConnectionException(Exception): @@ -31,7 +32,8 @@ def Connection(connection: Optional['Redis'] = None): # noqa connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. """ warnings.warn( - "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning + "The Connection context manager is deprecated. Use the `connection` parameter instead.", + DeprecationWarning, ) if connection is None: connection = Redis() @@ -41,7 +43,8 @@ def Connection(connection: Optional['Redis'] = None): # noqa finally: popped = pop_connection() assert popped == connection, ( - 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.' + 'Unexpected Redis connection was popped off the stack. ' + 'Check your Redis connection setup.' ) @@ -52,6 +55,10 @@ def push_connection(redis: 'Redis'): Args: redis (Redis): A Redis connection """ + warnings.warn( + "The `push_connection` function is deprecated. Pass the `connection` explicitly instead.", + DeprecationWarning, + ) _connection_stack.push(redis) @@ -62,25 +69,13 @@ def pop_connection() -> 'Redis': Returns: redis (Redis): A Redis connection """ + warnings.warn( + "The `pop_connection` function is deprecated. Pass the `connection` explicitly instead.", + DeprecationWarning, + ) return _connection_stack.pop() -def use_connection(redis: Optional['Redis'] = None): - """ - Clears the stack and uses the given connection. Protects against mixed - use of use_connection() and stacked connection contexts. - - Args: - redis (Optional[Redis], optional): A Redis Connection. Defaults to None. - """ - assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()' - release_local(_connection_stack) - - if redis is None: - redis = Redis() - push_connection(redis) - - def get_current_connection() -> 'Redis': """ Returns the current Redis connection (i.e. the topmost on the @@ -89,6 +84,10 @@ def get_current_connection() -> 'Redis': Returns: Redis: A Redis Connection """ + warnings.warn( + "The `get_current_connection` function is deprecated. Pass the `connection` explicitly instead.", + DeprecationWarning, + ) return _connection_stack.top @@ -106,7 +105,10 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis': Returns: Redis: A Redis Connection """ - + warnings.warn( + "The `resolve_connection` function is deprecated. Pass the `connection` explicitly instead.", + DeprecationWarning, + ) if connection is not None: return connection @@ -118,4 +120,6 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis': _connection_stack = LocalStack() -__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection'] + +__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection'] + diff --git a/rq/defaults.py b/rq/defaults.py index bd50489..2a3d57a 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -88,3 +88,9 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' Uses Python's default attributes as defined https://docs.python.org/3/library/logging.html#logrecord-attributes """ + + +DEFAULT_DEATH_PENALTY_CLASS = 'rq.timeouts.UnixSignalDeathPenalty' +""" The path for the default Death Penalty class to use. +Defaults to the `UnixSignalDeathPenalty` class within the `rq.timeouts` module +"""
\ No newline at end of file diff --git a/rq/exceptions.py b/rq/exceptions.py index ee51753..b84ade1 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -30,3 +30,7 @@ class ShutDownImminentException(Exception): class TimeoutFormatError(Exception): pass + + +class AbandonedJobError(Exception): + pass @@ -1,16 +1,18 @@ import inspect import json +import logging import warnings import zlib import asyncio -from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum from redis import WatchError -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Type from uuid import uuid4 +from .defaults import CALLBACK_TIMEOUT +from .timeouts import JobTimeoutException, BaseDeathPenalty if TYPE_CHECKING: from .results import Result @@ -36,6 +38,8 @@ from .utils import ( utcnow, ) +logger = logging.getLogger("rq.job") + class JobStatus(str, Enum): """The Status of Job within its lifecycle at any given time.""" @@ -153,8 +157,8 @@ class Job: failure_ttl: Optional[int] = None, serializer=None, *, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None + on_success: Optional[Union['Callback', Callable[..., Any]]] = None, + on_failure: Optional[Union['Callback', Callable[..., Any]]] = None ) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -234,14 +238,20 @@ class Job: job._kwargs = kwargs if on_success: - if not inspect.isfunction(on_success) and not inspect.isbuiltin(on_success): - raise ValueError('on_success callback must be a function') - job._success_callback_name = '{0}.{1}'.format(on_success.__module__, on_success.__qualname__) + if not isinstance(on_success, Callback): + warnings.warn('Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', + DeprecationWarning) + on_success = Callback(on_success) # backward compatibility + job._success_callback_name = on_success.name + job._success_callback_timeout = on_success.timeout if on_failure: - if not inspect.isfunction(on_failure) and not inspect.isbuiltin(on_failure): - raise ValueError('on_failure callback must be a function') - job._failure_callback_name = '{0}.{1}'.format(on_failure.__module__, on_failure.__qualname__) + if not isinstance(on_failure, Callback): + warnings.warn('Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', + DeprecationWarning) + on_failure = Callback(on_failure) # backward compatibility + job._failure_callback_name = on_failure.name + job._failure_callback_timeout = on_failure.timeout # Extra meta data job.description = description or job.get_call_string() @@ -254,12 +264,17 @@ class Job: # dependency could be job instance or id, or iterable thereof if depends_on is not None: - if isinstance(depends_on, Dependency): - job.enqueue_at_front = depends_on.enqueue_at_front - job.allow_dependency_failures = depends_on.allow_failure - depends_on_list = depends_on.dependencies - else: - depends_on_list = ensure_list(depends_on) + depends_on = ensure_list(depends_on) + depends_on_list = [] + for depends_on_item in depends_on: + if isinstance(depends_on_item, Dependency): + # If a Dependency has enqueue_at_front or allow_failure set to True, these behaviors are used for + # all dependencies. + job.enqueue_at_front = job.enqueue_at_front or depends_on_item.enqueue_at_front + job.allow_dependency_failures = job.allow_dependency_failures or depends_on_item.allow_failure + depends_on_list.extend(depends_on_item.dependencies) + else: + depends_on_list.extend(ensure_list(depends_on_item)) job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list] return job @@ -397,6 +412,13 @@ class Job: return self._success_callback @property + def success_callback_timeout(self) -> int: + if self._success_callback_timeout is None: + return CALLBACK_TIMEOUT + + return self._success_callback_timeout + + @property def failure_callback(self): if self._failure_callback is UNEVALUATED: if self._failure_callback_name: @@ -406,6 +428,13 @@ class Job: return self._failure_callback + @property + def failure_callback_timeout(self) -> int: + if self._failure_callback_timeout is None: + return CALLBACK_TIMEOUT + + return self._failure_callback_timeout + def _deserialize_data(self): """Deserializes the Job `data` into a tuple. This includes the `_func_name`, `_instance`, `_args` and `_kwargs` @@ -575,6 +604,8 @@ class Job: self._result = None self._exc_info = None self.timeout: Optional[float] = None + self._success_callback_timeout: Optional[int] = None + self._failure_callback_timeout: Optional[int] = None self.result_ttl: Optional[int] = None self.failure_ttl: Optional[int] = None self.ttl: Optional[int] = None @@ -862,9 +893,15 @@ class Job: if obj.get('success_callback_name'): self._success_callback_name = obj.get('success_callback_name').decode() + if 'success_callback_timeout' in obj: + self._success_callback_timeout = int(obj.get('success_callback_timeout')) + if obj.get('failure_callback_name'): self._failure_callback_name = obj.get('failure_callback_name').decode() + if 'failure_callback_timeout' in obj: + self._failure_callback_timeout = int(obj.get('failure_callback_timeout')) + dep_ids = obj.get('dependency_ids') dep_id = obj.get('dependency_id') # for backwards compatibility self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else [] @@ -942,6 +979,10 @@ class Job: obj['exc_info'] = zlib.compress(str(self._exc_info).encode('utf-8')) if self.timeout is not None: obj['timeout'] = self.timeout + if self._success_callback_timeout is not None: + obj['success_callback_timeout'] = self._success_callback_timeout + if self._failure_callback_timeout is not None: + obj['failure_callback_timeout'] = self._failure_callback_timeout if self.result_ttl is not None: obj['result_ttl'] = self.result_ttl if self.failure_ttl is not None: @@ -1303,6 +1344,35 @@ class Job: self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer ) + def execute_success_callback(self, death_penalty_class: Type[BaseDeathPenalty], result: Any): + """Executes success_callback for a job. + with timeout . + + Args: + death_penalty_class (Type[BaseDeathPenalty]): The penalty class to use for timeout + result (Any): The job's result. + """ + if not self.success_callback: + return + + logger.debug('Running success callbacks for %s', self.id) + with death_penalty_class(self.success_callback_timeout, JobTimeoutException, job_id=self.id): + self.success_callback(self, self.connection, result) + + def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info): + """Executes failure_callback with possible timeout + """ + if not self.failure_callback: + return + + logger.debug('Running failure callbacks for %s', self.id) + try: + with death_penalty_class(self.failure_callback_timeout, JobTimeoutException, job_id=self.id): + self.failure_callback(self, self.connection, *exc_info) + except Exception: # noqa + logger.exception(f'Job {self.id}: error while executing failure callback') + raise + def _handle_success(self, result_ttl: int, pipeline: 'Pipeline'): """Saves and cleanup job after successful execution""" # self.log.debug('Setting job %s status to finished', job.id) @@ -1317,9 +1387,8 @@ class Job: # for backward compatibility if self.supports_redis_streams: from .results import Result - Result.create( - self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline - ) + + Result.create(self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline) if result_ttl != 0: finished_job_registry = self.finished_job_registry @@ -1339,6 +1408,7 @@ class Job: ) if self.supports_redis_streams: from .results import Result + Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) def get_retry_interval(self) -> int: @@ -1372,7 +1442,7 @@ class Job: self.set_status(JobStatus.SCHEDULED) queue.schedule_job(self, scheduled_datetime, pipeline=pipeline) else: - queue.enqueue_job(self, pipeline=pipeline) + queue._enqueue_job(self, pipeline=pipeline) def register_dependency(self, pipeline: Optional['Pipeline'] = None): """Jobs may have dependencies. Jobs are enqueued only if the jobs they @@ -1502,3 +1572,16 @@ class Retry: self.max = max self.intervals = intervals + + +class Callback: + def __init__(self, func: Callable[..., Any], timeout: Optional[Any] = None): + if not inspect.isfunction(func) and not inspect.isbuiltin(func): + raise ValueError('Callback func must be a function') + + self.func = func + self.timeout = parse_timeout(timeout) if timeout else CALLBACK_TIMEOUT + + @property + def name(self) -> str: + return '{0}.{1}'.format(self.func.__module__, self.func.__qualname__) diff --git a/rq/local.py b/rq/local.py index 8e94457..e6b070b 100644 --- a/rq/local.py +++ b/rq/local.py @@ -15,7 +15,7 @@ try: from greenlet import getcurrent as get_ident except ImportError: # noqa try: - from thread import get_ident # noqa + from threading import get_ident # noqa except ImportError: # noqa try: from _thread import get_ident # noqa @@ -296,18 +296,6 @@ class LocalProxy: return '<%s unbound>' % self.__class__.__name__ return repr(obj) - def __nonzero__(self): - try: - return bool(self._get_current_object()) - except RuntimeError: - return False - - def __unicode__(self): - try: - return unicode(self._get_current_object()) - except RuntimeError: - return repr(self) - def __dir__(self): try: return dir(self._get_current_object()) @@ -325,12 +313,6 @@ class LocalProxy: def __delitem__(self, key): del self._get_current_object()[key] - def __setslice__(self, i, j, seq): - self._get_current_object()[i:j] = seq - - def __delslice__(self, i, j): - del self._get_current_object()[i:j] - __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v) __delattr__ = lambda x, n: delattr(x._get_current_object(), n) __str__ = lambda x: str(x._get_current_object()) @@ -340,14 +322,12 @@ class LocalProxy: __ne__ = lambda x, o: x._get_current_object() != o __gt__ = lambda x, o: x._get_current_object() > o __ge__ = lambda x, o: x._get_current_object() >= o - __cmp__ = lambda x, o: cmp(x._get_current_object(), o) __hash__ = lambda x: hash(x._get_current_object()) __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw) __len__ = lambda x: len(x._get_current_object()) __getitem__ = lambda x, i: x._get_current_object()[i] __iter__ = lambda x: iter(x._get_current_object()) __contains__ = lambda x, i: i in x._get_current_object() - __getslice__ = lambda x, i, j: x._get_current_object()[i:j] __add__ = lambda x, o: x._get_current_object() + o __sub__ = lambda x, o: x._get_current_object() - o __mul__ = lambda x, o: x._get_current_object() * o @@ -373,6 +353,5 @@ class LocalProxy: __oct__ = lambda x: oct(x._get_current_object()) __hex__ = lambda x: hex(x._get_current_object()) __index__ = lambda x: x._get_current_object().__index__() - __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o) __enter__ = lambda x: x._get_current_object().__enter__() __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw) diff --git a/rq/queue.py b/rq/queue.py index 3c483e8..77a6f3e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -10,6 +10,8 @@ from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Ty from redis import WatchError +from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty + if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline @@ -62,13 +64,15 @@ class EnqueueData( @total_ordering class Queue: job_class: Type['Job'] = Job + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty DEFAULT_TIMEOUT: int = 180 # Default timeout seconds. redis_queue_namespace_prefix: str = 'rq:queue:' redis_queues_keys: str = 'rq:queues' @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None + cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, + serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None ) -> List['Queue']: """Returns an iterable of all Queues. @@ -76,6 +80,7 @@ class Queue: connection (Optional[Redis], optional): The Redis Connection. Defaults to None. job_class (Optional[Job], optional): The Job class to use. Defaults to None. serializer (optional): The serializer to use. Defaults to None. + death_penalty_class (Optional[Job], optional): The Death Penalty class to use. Defaults to None. Returns: queues (List[Queue]): A list of all queues. @@ -84,7 +89,8 @@ class Queue: def to_queue(queue_key): return cls.from_queue_key( - as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer + as_text(queue_key), connection=connection, job_class=job_class, + serializer=serializer, death_penalty_class=death_penalty_class ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) @@ -96,8 +102,9 @@ class Queue: cls, queue_key: str, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, + job_class: Optional[Type['Job']] = None, serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -108,6 +115,7 @@ class Queue: connection (Optional[Redis], optional): Redis connection. Defaults to None. job_class (Optional[Job], optional): Job class. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. + death_penalty_class (Optional[BaseDeathPenalty], optional): Death penalty class. Defaults to None. Raises: ValueError: If the queue_key doesn't start with the defined prefix @@ -119,7 +127,8 @@ class Queue: if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) name = queue_key[len(prefix):] - return cls(name, connection=connection, job_class=job_class, serializer=serializer) + return cls(name, connection=connection, job_class=job_class, serializer=serializer, + death_penalty_class=death_penalty_class) def __init__( self, @@ -129,6 +138,7 @@ class Queue: is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, **kwargs, ): """Initializes a Queue object. @@ -141,6 +151,7 @@ class Queue: If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. + death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty. """ self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix @@ -159,6 +170,7 @@ class Queue: if isinstance(job_class, str): job_class = import_attribute(job_class) self.job_class = job_class + self.death_penalty_class = death_penalty_class self.serializer = resolve_serializer(serializer) self.redis_server_version: Optional[Tuple[int, int, int]] = None @@ -166,9 +178,6 @@ class Queue: def __len__(self): return self.count - def __nonzero__(self): - return True - def __bool__(self): return True @@ -336,7 +345,7 @@ class Queue: else: end = length job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] - self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") + self.log.debug('Getting jobs for queue %s: %d found.', green(self.name), len(job_ids)) return job_ids def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']: @@ -455,7 +464,7 @@ class Queue: result = connection.lpush(self.key, job_id) else: result = connection.rpush(self.key, job_id) - self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") + self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result) def create_job( self, @@ -663,12 +672,7 @@ class Queue: on_success=on_success, on_failure=on_failure, ) - - job = self.setup_dependencies(job, pipeline=pipeline) - # If we do not depend on an unfinished job, enqueue the job. - if job.get_status(refresh=False) != JobStatus.DEFERRED: - return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) - return job + return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) @staticmethod def prepare_data( @@ -739,7 +743,7 @@ class Queue: """ pipe = pipeline if pipeline is not None else self.connection.pipeline() jobs = [ - self.enqueue_job( + self._enqueue_job( self.create_job( job_data.func, args=job_data.args, @@ -980,7 +984,26 @@ class Queue: return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: - """Enqueues a job for delayed execution. + """Enqueues a job for delayed execution checking dependencies. + + Args: + job (Job): The job to enqueue + pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None. + at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False. + + Returns: + Job: The enqued job + """ + job.origin = self.name + job = self.setup_dependencies(job, pipeline=pipeline) + # If we do not depend on an unfinished job, enqueue the job. + if job.get_status(refresh=False) != JobStatus.DEFERRED: + return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) + return job + + + def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: + """Enqueues a job for delayed execution without checking dependencies. If Queue is instantiated with is_async=False, job is executed immediately. @@ -1067,7 +1090,6 @@ class Queue: dependents_key = job.dependents_key while True: - try: # if a pipeline is passed, the caller is responsible for calling WATCH # to ensure all jobs are enqueued @@ -1107,10 +1129,10 @@ class Queue: registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) else: queue = self.__class__(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) + queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) # Only delete dependents_key if all dependents have been enqueued if len(jobs_to_enqueue) == len(dependent_job_ids): @@ -1140,7 +1162,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None): + def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -1155,7 +1177,7 @@ class Queue: Args: queue_keys (_type_): _description_ - timeout (int): _description_ + timeout (Optional[int]): _description_ connection (Optional[Redis], optional): _description_. Defaults to None. Raises: @@ -1188,10 +1210,11 @@ class Queue: def dequeue_any( cls, queues: List['Queue'], - timeout: int, + timeout: Optional[int], connection: Optional['Redis'] = None, job_class: Optional['Job'] = None, serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1205,10 +1228,11 @@ class Queue: Args: queues (List[Queue]): List of queue objects - timeout (int): Timeout for the LPOP + timeout (Optional[int]): Timeout for the LPOP connection (Optional[Redis], optional): Redis Connection. Defaults to None. - job_class (Optional[Job], optional): The job classification. Defaults to None. + job_class (Optional[Type[Job]], optional): The job class. Defaults to None. serializer (Any, optional): Serializer to use. Defaults to None. + death_penalty_class (Optional[Type[BaseDeathPenalty]], optional): The death penalty class. Defaults to None. Raises: e: Any exception @@ -1224,7 +1248,8 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer) + queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, + serializer=serializer, death_penalty_class=death_penalty_class) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: diff --git a/rq/registry.py b/rq/registry.py index 509bd87..2bd874c 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,22 +1,30 @@ import calendar +import logging +import traceback + from rq.serializers import resolve_serializer import time from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, List, Optional, Type, Union +from .timeouts import JobTimeoutException, UnixSignalDeathPenalty, BaseDeathPenalty + if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline from .utils import as_text from .connections import resolve_connection -from .defaults import DEFAULT_FAILURE_TTL -from .exceptions import InvalidJobOperation, NoSuchJobError +from .defaults import DEFAULT_FAILURE_TTL, CALLBACK_TIMEOUT +from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError from .job import Job, JobStatus from .queue import Queue from .utils import backend_class, current_timestamp +logger = logging.getLogger("rq.registry") + + class BaseRegistry: """ Base implementation of a job registry, implemented in Redis sorted set. @@ -25,6 +33,7 @@ class BaseRegistry: """ job_class = Job + death_penalty_class = UnixSignalDeathPenalty key_template = 'rq:registry:{0}' def __init__( @@ -34,6 +43,7 @@ class BaseRegistry: job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ): if queue: self.name = queue.name @@ -46,6 +56,7 @@ class BaseRegistry: self.key = self.key_template.format(self.name) self.job_class = backend_class(self, 'job_class', override=job_class) + self.death_penalty_class = backend_class(self, 'death_penalty_class', override=death_penalty_class) def __len__(self): """Returns the number of jobs in this registry""" @@ -186,7 +197,7 @@ class BaseRegistry: job.ended_at = None job._exc_info = '' job.save() - job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front) + job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front) pipeline.execute() return job @@ -204,7 +215,7 @@ class StartedJobRegistry(BaseRegistry): key_template = 'rq:wip:{0}' def cleanup(self, timestamp: Optional[float] = None): - """Remove expired jobs from registry and add them to FailedJobRegistry. + """Remove abandoned jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as seconds since the Unix epoch. timestamp defaults to call time if @@ -226,6 +237,9 @@ class StartedJobRegistry(BaseRegistry): except NoSuchJobError: continue + job.execute_failure_callback(self.death_penalty_class, AbandonedJobError, AbandonedJobError(), + traceback.extract_stack()) + retry = job.retries_left and job.retries_left > 0 if retry: @@ -233,8 +247,11 @@ class StartedJobRegistry(BaseRegistry): job.retry(queue, pipeline) else: + exc_string = f"due to {AbandonedJobError.__name__}" + logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' + f'({exc_string})') job.set_status(JobStatus.FAILED) - job._exc_info = "Moved to FailedJobRegistry at %s" % datetime.now() + job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}" job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) diff --git a/rq/results.py b/rq/results.py index a6dafde..55ee971 100644 --- a/rq/results.py +++ b/rq/results.py @@ -16,7 +16,7 @@ def get_key(job_id): return 'rq:results:%s' % job_id -class Result(object): +class Result: class Type(Enum): SUCCESSFUL = 1 FAILED = 2 @@ -85,7 +85,7 @@ class Result(object): # response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True) response = job.connection.xrevrange(cls.get_key(job.id), '+', '-') results = [] - for (result_id, payload) in response: + for result_id, payload in response: results.append( cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) ) @@ -169,7 +169,10 @@ class Result(object): if pipeline is None: self.id = result.decode() if ttl is not None: - connection.expire(key, ttl) + if ttl == -1: + connection.persist(key) + else: + connection.expire(key, ttl) return self.id def serialize(self): diff --git a/rq/scheduler.py b/rq/scheduler.py index 84802b6..67b6431 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -6,6 +6,7 @@ import traceback from datetime import datetime from enum import Enum from multiprocessing import Process +from typing import List from redis import SSLConnection, UnixDomainSocketConnection @@ -35,14 +36,14 @@ class RQScheduler: Status = SchedulerStatus def __init__( - self, - queues, - connection, - interval=1, - logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, - serializer=None, + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() @@ -110,7 +111,7 @@ class RQScheduler: """Returns names of queue it successfully acquires lock on""" successful_locks = set() pid = os.getpid() - self.log.debug("Trying to acquire locks for %s", ", ".join(self._queue_names)) + self.log.debug('Trying to acquire locks for %s', ', '.join(self._queue_names)) for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60): successful_locks.add(name) @@ -166,7 +167,7 @@ class RQScheduler: jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: - queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) + queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED @@ -184,7 +185,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) + self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -196,7 +197,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) + self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -232,10 +233,21 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info('Scheduler for %s started with PID %s', ', '.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise - scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) + scheduler.log.info('Scheduler with PID %d has stopped', os.getpid()) + + +def parse_names(queues_or_names) -> List[str]: + """Given a list of strings or queues, returns queue names""" + names = [] + for queue_or_name in queues_or_names: + if isinstance(queue_or_name, Queue): + names.append(queue_or_name.name) + else: + names.append(str(queue_or_name)) + return names diff --git a/rq/timeouts.py b/rq/timeouts.py index a1401c5..44f01f9 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -110,10 +110,14 @@ class TimerDeathPenalty(BaseDeathPenalty): def setup_death_penalty(self): """Starts the timer.""" + if self._timeout <= 0: + return self._timer = self.new_timer() self._timer.start() def cancel_death_penalty(self): """Cancels the timer.""" + if self._timeout <= 0: + return self._timer.cancel() self._timer = None diff --git a/rq/utils.py b/rq/utils.py index c8c474d..051d5ee 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -96,7 +96,6 @@ def make_colorizer(color: str): class ColorizingStreamHandler(logging.StreamHandler): - levels = { logging.WARNING: make_colorizer('darkyellow'), logging.ERROR: make_colorizer('darkred'), @@ -182,10 +181,7 @@ def import_attribute(name: str) -> Callable[..., Any]: E.g.: package_a.package_b.module_a.ClassA.my_static_method Thus we remove the bits from the end of the name until we can import it - Sometimes the failure during importing is due to a genuine coding error in the imported module - In this case, the exception is logged as a warning for ease of debugging. - The above logic will apply anyways regardless of the cause of the import error. - + Args: name (str): The name (reference) to the path. @@ -204,7 +200,6 @@ def import_attribute(name: str) -> Callable[..., Any]: module = importlib.import_module(module_name) break except ImportError: - logger.warning("Import error for '%s'" % module_name, exc_info=True) attribute_bits.insert(0, module_name_bits.pop()) if module is None: @@ -354,7 +349,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]: return utcparse(date_str.decode()) -def parse_timeout(timeout: Any): +def parse_timeout(timeout: Union[int, float, str]) -> int: """Transfer all kinds of timeout format to an integer representing seconds""" if not isinstance(timeout, numbers.Integral) and timeout is not None: try: diff --git a/rq/version.py b/rq/version.py index 342b3ec..9fb1be8 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '1.12.0' +VERSION = '1.13.0' diff --git a/rq/worker.py b/rq/worker.py index a324ad6..80c0384 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,5 +1,7 @@ +import contextlib import errno import logging +import math import os import random import signal @@ -8,14 +10,18 @@ import sys import time import traceback import warnings - from datetime import timedelta from enum import Enum -from uuid import uuid4 from random import shuffle -from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union +from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, + Union) +from uuid import uuid4 if TYPE_CHECKING: + try: + from resource import struct_rusage + except ImportError: + pass from redis import Redis from redis.client import Pipeline @@ -23,12 +29,13 @@ try: from signal import SIGKILL except ImportError: from signal import SIGTERM as SIGKILL + from contextlib import suppress + import redis.exceptions from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection from .defaults import ( @@ -41,18 +48,20 @@ from .defaults import ( DEFAULT_LOGGING_DATE_FORMAT, ) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException + from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler +from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text from .version import VERSION -from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -90,6 +99,12 @@ def signal_name(signum): return 'SIG_UNKNOWN' +class DequeueStrategy(str, Enum): + DEFAULT = "default" + ROUND_ROBIN = "round_robin" + RANDOM = "random" + + class WorkerStatus(str, Enum): STARTED = 'started' SUSPENDED = 'suspended' @@ -108,9 +123,9 @@ class Worker: log_result_lifespan = True # `log_job_description` is used to toggle logging an entire jobs description. log_job_description = True - # factor to increase connection_wait_time incase of continous connection failures. + # factor to increase connection_wait_time in case of continuous connection failures. exponential_backoff_factor = 2.0 - # Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable. + # Max Wait time (in seconds) after which exponential_backoff_factor won't be applicable. max_connection_wait_time = 60.0 @classmethod @@ -132,7 +147,7 @@ class Worker: elif connection is None: connection = get_current_connection() - worker_keys = get_keys(queue=queue, connection=connection) + worker_keys = worker_registration.get_keys(queue=queue, connection=connection) workers = [ cls.find_by_key( key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer @@ -152,7 +167,7 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys """ - return [as_text(key) for key in get_keys(queue=queue, connection=connection)] + return [as_text(key) for key in worker_registration.get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int: @@ -165,7 +180,7 @@ class Worker: Returns: length (int): The queue length. """ - return len(get_keys(queue=queue, connection=connection)) + return len(worker_registration.get_keys(queue=queue, connection=connection)) @classmethod def find_by_key( @@ -226,6 +241,7 @@ class Worker: exc_handler=None, exception_handlers=None, default_worker_ttl=DEFAULT_WORKER_TTL, + maintenance_interval: int = DEFAULT_MAINTENANCE_TASK_INTERVAL, job_class: Type['Job'] = None, queue_class=None, log_job_description: bool = True, @@ -233,11 +249,12 @@ class Worker: disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None, + work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None ): # noqa - self.default_result_ttl = default_result_ttl self.worker_ttl = default_worker_ttl self.job_monitoring_interval = job_monitoring_interval + self.maintenance_interval = maintenance_interval connection = self._set_connection(connection) self.connection = connection @@ -250,7 +267,8 @@ class Worker: self.serializer = resolve_serializer(serializer) queues = [ - self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) + self.queue_class(name=q, connection=connection, job_class=self.job_class, + serializer=self.serializer, death_penalty_class=self.death_penalty_class,) if isinstance(q, str) else q for q in ensure_list(queues) @@ -261,6 +279,7 @@ class Worker: self.validate_queues() self._ordered_queues = self.queues[:] self._exc_handlers: List[Callable] = [] + self._work_horse_killed_handler = work_horse_killed_handler self._state: str = 'starting' self._is_horse: bool = False @@ -279,6 +298,7 @@ class Worker: self.scheduler: Optional[RQScheduler] = None self.pubsub = None self.pubsub_thread = None + self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT self.disable_default_exception_handler = disable_default_exception_handler @@ -532,8 +552,7 @@ class Worker: return self.job_class.fetch(job_id, self.connection, self.serializer) def _install_signal_handlers(self): - """Installs signal handlers for handling SIGINT and SIGTERM gracefully. - """ + """Installs signal handlers for handling SIGINT and SIGTERM gracefully.""" signal.signal(signal.SIGINT, self.request_stop) signal.signal(signal.SIGTERM, self.request_stop) @@ -553,18 +572,14 @@ class Worker: else: raise - def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]: + def wait_for_horse(self) -> Tuple[Optional[int], Optional[int], Optional['struct_rusage']]: """Waits for the horse process to complete. Uses `0` as argument as to include "any child in the process group of the current process". """ - pid = None - stat = None - try: - pid, stat = os.waitpid(self.horse_pid, 0) - except ChildProcessError: - # ChildProcessError: [Errno 10] No child processes - pass - return pid, stat + pid = stat = rusage = None + with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes + pid, stat, rusage = os.wait4(self.horse_pid, 0) + return pid, stat, rusage def request_force_stop(self, signum, frame): """Terminates the application (cold shutdown). @@ -621,13 +636,11 @@ class Worker: self.log.info('Warm shut down requested') def check_for_suspension(self, burst: bool): - """Check to see if workers have been suspended by `rq suspend` - """ + """Check to see if workers have been suspended by `rq suspend`""" before_state = None notified = False while not self._stop_requested and is_suspended(self.connection, self): - if burst: self.log.info('Suspended in burst mode, exiting') self.log.info('Note: There could still be unfinished jobs on the queue') @@ -674,14 +687,90 @@ class Worker: self.pubsub.unsubscribe() self.pubsub.close() - def reorder_queues(self, reference_queue): - """Method placeholder to workers that implement some reordering strategy. - `pass` here means that the queue will remain with the same job order. + def reorder_queues(self, reference_queue: 'Queue'): + """Reorder the queues according to the strategy. + As this can be defined both in the `Worker` initialization or in the `work` method, + it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute. Args: - reference_queue (Union[Queue, str]): The queue - """ - pass + reference_queue (Union[Queue, str]): The queues to reorder + """ + if self._dequeue_strategy is None: + self._dequeue_strategy = DequeueStrategy.DEFAULT + + if self._dequeue_strategy not in ("default", "random", "round_robin"): + raise ValueError( + f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`." + ) + if self._dequeue_strategy == DequeueStrategy.DEFAULT: + return + if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN: + pos = self._ordered_queues.index(reference_queue) + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + return + if self._dequeue_strategy == DequeueStrategy.RANDOM: + shuffle(self._ordered_queues) + return + + def bootstrap( + self, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT + ): + """Bootstraps the worker. + Runs the basic tasks that should run when the worker actually starts working. + Used so that new workers can focus on the work loop implementation rather + than the full bootstraping process. + + Args: + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + setup_loghandlers(logging_level, date_format, log_format) + self.register_birth() + self.log.info('Worker %s: started, version %s', self.key, VERSION) + self.subscribe() + self.set_state(WorkerStatus.STARTED) + qnames = self.queue_names() + self.log.info('*** Listening on %s...', green(', '.join(qnames))) + + def _start_scheduler( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + ): + """Starts the scheduler process. + This is specifically designed to be run by the worker when running the `work()` method. + Instanciates the RQScheduler and tries to acquire a lock. + If the lock is acquired, start scheduler. + If worker is on burst mode just enqueues scheduled jobs and quits, + otherwise, starts the scheduler in a separate process. + + Args: + burst (bool, optional): Whether to work on burst mode. Defaults to False. + logging_level (str, optional): Logging level to use. Defaults to "INFO". + date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. + log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. + """ + self.scheduler = RQScheduler( + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) + self.scheduler.acquire_locks() + if self.scheduler.acquired_locks: + if burst: + self.scheduler.enqueue_scheduled_jobs() + self.scheduler.release_locks() + else: + self.scheduler.start() def work( self, @@ -690,13 +779,16 @@ class Worker: date_format: str = DEFAULT_LOGGING_DATE_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT, max_jobs: Optional[int] = None, + max_idle_time: Optional[int] = None, with_scheduler: bool = False, + dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT ) -> bool: """Starts the work loop. Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. + If `max_idle_time` is provided, worker will die when it's idle for more than the provided value. The return value indicates whether any jobs were processed. @@ -706,39 +798,18 @@ class Worker: date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. + max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. + dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT Returns: worked (bool): Will return True if any job was processed, False otherwise. """ - setup_loghandlers(logging_level, date_format, log_format) + self.bootstrap(logging_level, date_format, log_format) + self._dequeue_strategy = dequeue_strategy completed_jobs = 0 - self.register_birth() - self.log.info("Worker %s: started, version %s", self.key, VERSION) - self.subscribe() - self.set_state(WorkerStatus.STARTED) - qnames = self.queue_names() - self.log.info('*** Listening on %s...', green(', '.join(qnames))) - if with_scheduler: - self.scheduler = RQScheduler( - self.queues, - connection=self.connection, - logging_level=logging_level, - date_format=date_format, - log_format=log_format, - serializer=self.serializer, - ) - self.scheduler.acquire_locks() - # If lock is acquired, start scheduler - if self.scheduler.acquired_locks: - # If worker is run on burst mode, enqueue_scheduled_jobs() - # before working. Otherwise, start scheduler in a separate process - if burst: - self.scheduler.enqueue_scheduled_jobs() - self.scheduler.release_locks() - else: - self.scheduler.start() + self._start_scheduler(burst, logging_level, date_format, log_format) self._install_signal_handlers() try: @@ -754,25 +825,26 @@ class Worker: break timeout = None if burst else self.dequeue_timeout - result = self.dequeue_job_and_maintain_ttl(timeout) + result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time) if result is None: if burst: - self.log.info("Worker %s: done, quitting", self.key) + self.log.info('Worker %s: done, quitting', self.key) + elif max_idle_time is not None: + self.log.info('Worker %s: idle for %d seconds, quitting', self.key, max_idle_time) break job, queue = result - self.reorder_queues(reference_queue=queue) self.execute_job(job, queue) self.heartbeat() completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) + self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs) break except redis.exceptions.TimeoutError: - self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...") + self.log.error('Worker %s: Redis connection timeout, quitting...', self.key) break except StopRequested: @@ -786,15 +858,16 @@ class Worker: self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: - if not self.is_horse: - - if self.scheduler: - self.stop_scheduler() - - self.register_death() - self.unsubscribe() + self.teardown() return bool(completed_jobs) + def teardown(self): + if not self.is_horse: + if self.scheduler: + self.stop_scheduler() + self.register_death() + self.unsubscribe() + def stop_scheduler(self): """Ensure scheduler process is stopped Will send the kill signal to scheduler process, @@ -808,7 +881,7 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -821,25 +894,31 @@ class Worker: self.procline('Listening on ' + qnames) self.log.debug('*** Listening on %s...', green(qnames)) connection_wait_time = 1.0 + idle_since = utcnow() + idle_time_left = max_idle_time while True: - try: self.heartbeat() if self.should_run_maintenance_tasks: self.run_maintenance_tasks() - self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") + if timeout is not None and idle_time_left is not None: + timeout = min(timeout, idle_time_left) + + self.log.debug('Dequeueing jobs on queues %s and timeout %d', green(qnames), timeout) result = self.queue_class.dequeue_any( self._ordered_queues, timeout, connection=self.connection, job_class=self.job_class, serializer=self.serializer, + death_penalty_class=self.death_penalty_class, ) if result is not None: job, queue = result - self.log.debug(f"Dequeued job {blue(job.id)} from {green(queue.name)}") + self.reorder_queues(reference_queue=queue) + self.log.debug('Dequeued job %s from %s', blue(job.id), green(queue.name)) job.redis_server_version = self.get_redis_server_version() if self.log_job_description: self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) @@ -848,7 +927,11 @@ class Worker: break except DequeueTimeout: - pass + if max_idle_time is not None: + idle_for = (utcnow() - idle_since).total_seconds() + idle_time_left = math.ceil(max_idle_time - idle_for) + if idle_time_left <= 0: + break except redis.exceptions.ConnectionError as conn_err: self.log.error( 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time @@ -1028,12 +1111,12 @@ class Worker: job (Job): _description_ queue (Queue): _description_ """ - ret_val = None + retpid = ret_val = rusage = None job.started_at = utcnow() while True: try: - with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): - retpid, ret_val = self.wait_for_horse() + with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException): + retpid, ret_val, rusage = self.wait_for_horse() break except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. @@ -1073,20 +1156,20 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") + self.handle_job_failure(job, queue=queue, exc_string='Job stopped by user, work-horse terminated.') elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning( - ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( - ret_val - ) - ) + signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else '' + exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " + self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) + self.handle_work_horse_killed(job, retpid, ret_val, rusage) self.handle_job_failure( - job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val + job, queue=queue, + exc_string=exc_string ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1156,7 +1239,7 @@ class Worker: """Performs misc bookkeeping like updating states prior to job execution. """ - self.log.debug(f"Preparing for execution of Job ID {job.id}") + self.log.debug('Preparing for execution of Job ID %s', job.id) with self.connection.pipeline() as pipeline: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) @@ -1167,7 +1250,7 @@ class Worker: job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() - self.log.debug(f"Job preparation finished.") + self.log.debug('Job preparation finished.') msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) @@ -1267,7 +1350,7 @@ class Worker: result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - self.log.debug(f"Saving job {job.id}'s successful execution result") + self.log.debug('Saving job %s\'s successful execution result', job.id) job._handle_success(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) @@ -1280,30 +1363,6 @@ class Worker: except redis.exceptions.WatchError: continue - def execute_success_callback(self, job: 'Job', result: Any): - """Executes success_callback for a job. - with timeout . - - Args: - job (Job): The Job - result (Any): The job's result. - """ - self.log.debug(f"Running success callbacks for {job.id}") - job.heartbeat(utcnow(), CALLBACK_TIMEOUT) - with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): - job.success_callback(job, self.connection, result) - - def execute_failure_callback(self, job: 'Job'): - """Executes failure_callback with timeout - - Args: - job (Job): The Job - """ - self.log.debug(f"Running failure callbacks for {job.id}") - job.heartbeat(utcnow(), CALLBACK_TIMEOUT) - with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): - job.failure_callback(job, self.connection, *sys.exc_info()) - def perform_job(self, job: 'Job', queue: 'Queue') -> bool: """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -1317,7 +1376,7 @@ class Worker: """ push_connection(self.connection) started_job_registry = queue.started_job_registry - self.log.debug("Started Job Registry set.") + self.log.debug('Started Job Registry set.') try: self.prepare_job_execution(job) @@ -1325,9 +1384,9 @@ class Worker: job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): - self.log.debug("Performing Job...") + self.log.debug('Performing Job...') rv = job.perform() - self.log.debug(f"Finished performing Job ID {job.id}") + self.log.debug('Finished performing Job ID %s', job.id) job.ended_at = utcnow() @@ -1335,23 +1394,22 @@ class Worker: # to use the same exc handling when pickling fails job._result = rv - if job.success_callback: - self.execute_success_callback(job, rv) + job.heartbeat(utcnow(), job.success_callback_timeout) + job.execute_success_callback(self.death_penalty_class, rv) self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA - self.log.debug(f"Job {job.id} raised an exception.") + self.log.debug('Job %s raised an exception.', job.id) job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - if job.failure_callback: - try: - self.execute_failure_callback(job) - except: # noqa - self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) - exc_info = sys.exc_info() - exc_string = ''.join(traceback.format_exception(*exc_info)) + try: + job.heartbeat(utcnow(), job.failure_callback_timeout) + job.execute_failure_callback(self.death_penalty_class, *exc_info) + except: # noqa + exc_info = sys.exc_info() + exc_string = ''.join(traceback.format_exception(*exc_info)) self.handle_job_failure( job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry @@ -1364,8 +1422,7 @@ class Worker: self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) if rv is not None: - log_result = "{0!r}".format(as_text(str(rv))) - self.log.debug('Result: %s', yellow(log_result)) + self.log.debug('Result: %r', yellow(as_text(str(rv)))) if self.log_result_lifespan: result_ttl = job.get_result_ttl(self.default_result_ttl) @@ -1384,7 +1441,7 @@ class Worker: the other properties are accessed, which will stop exceptions from being properly logged, so we guard against it here. """ - self.log.debug(f"Handling exception for {job.id}.") + self.log.debug('Handling exception for %s.', job.id) exc_string = ''.join(traceback.format_exception(*exc_info)) try: extra = { @@ -1401,7 +1458,9 @@ class Worker: extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) + self.log.error( + '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra + ) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1423,6 +1482,12 @@ class Worker: """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def handle_work_horse_killed(self, job, retpid, ret_val, rusage): + if self._work_horse_killed_handler is None: + return + + self._work_horse_killed_handler(job, retpid, ret_val, rusage) + def __eq__(self, other): """Equality does not take the database/connection into account""" if not isinstance(other, self.__class__): @@ -1441,7 +1506,7 @@ class Worker: if queue.acquire_cleaning_lock(): self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue) - clean_worker_registry(queue) + worker_registration.clean_worker_registry(queue) self.last_cleaned_at = utcnow() @property @@ -1449,7 +1514,7 @@ class Worker: """Maintenance tasks should run on first startup or every 10 minutes.""" if self.last_cleaned_at is None: return True - if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL): + if (utcnow() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval): return True return False @@ -1490,6 +1555,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ + imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] @@ -1532,7 +1598,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] class RandomWorker(Worker): diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 0f31f1d..fe4dc04 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -86,7 +86,6 @@ def clean_worker_registry(queue: 'Queue'): keys = list(get_keys(queue)) with queue.connection.pipeline() as pipeline: - for key in keys: pipeline.exists(key) results = pipeline.execute() diff --git a/tests/test_cli.py b/tests/test_cli.py index 07b9c39..daa118b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,7 @@ from uuid import uuid4 import os import json +from click import BadParameter from click.testing import CliRunner from redis import Redis @@ -14,6 +15,7 @@ from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, pars from rq.job import Job from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.serializers import JSONSerializer +from rq.timeouts import UnixSignalDeathPenalty from rq.worker import Worker, WorkerStatus from rq.scheduler import RQScheduler @@ -118,6 +120,23 @@ class TestRQCli(RQTestCase): 'testhost.example.com', ) + def test_death_penalty_class(self): + cli_config = CliConfig() + + self.assertEqual( + UnixSignalDeathPenalty, + cli_config.death_penalty_class + ) + + cli_config = CliConfig(death_penalty_class='rq.job.Job') + self.assertEqual( + Job, + cli_config.death_penalty_class + ) + + with self.assertRaises(BadParameter): + CliConfig(death_penalty_class='rq.abcd') + def test_empty_nothing(self): """rq empty -u <url>""" runner = CliRunner() @@ -326,6 +345,21 @@ class TestRQCli(RQTestCase): result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) + def test_worker_dequeue_strategy(self): + """--quiet and --verbose logging options are supported""" + runner = CliRunner() + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong'] + result = runner.invoke(main, args) + self.assertEqual(result.exit_code, 1) + def test_exception_handlers(self): """rq worker -u <url> -b --exception-handler <handler>""" connection = Redis.from_url(self.redis_url) diff --git a/tests/test_connection.py b/tests/test_connection.py index fdfafbd..393c20d 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,9 +1,7 @@ from redis import Redis -from rq import Connection, Queue, use_connection, get_current_connection, pop_connection -from rq.connections import NoRedisConnectionException - -from tests import find_empty_redis_database, RQTestCase +from rq import Connection, Queue +from tests import RQTestCase, find_empty_redis_database from tests.fixtures import do_nothing @@ -37,30 +35,3 @@ class TestConnectionInheritance(RQTestCase): job2 = q2.enqueue(do_nothing) self.assertEqual(q1.connection, job1.connection) self.assertEqual(q2.connection, job2.connection) - - -class TestConnectionHelpers(RQTestCase): - def test_use_connection(self): - """Test function use_connection works as expected.""" - conn = new_connection() - use_connection(conn) - - self.assertEqual(conn, get_current_connection()) - - use_connection() - - self.assertNotEqual(conn, get_current_connection()) - - use_connection(self.testconn) # Restore RQTestCase connection - - with self.assertRaises(AssertionError): - with Connection(new_connection()): - use_connection() - with Connection(new_connection()): - use_connection() - - def test_resolve_connection_raises_on_no_connection(self): - """Test function resolve_connection raises if there is no connection.""" - pop_connection() - with self.assertRaises(NoRedisConnectionException): - Queue() diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 26b115d..a290a87 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -117,6 +117,37 @@ class TestDependencies(RQTestCase): self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) + def test_dependency_list_in_depends_on(self): + """Enqueue with Dependency list in depends_on""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job1 = q.enqueue(say_hello) + parent_job2 = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])]) + w.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + + def test_enqueue_job_dependency(self): + """Enqueue via Queue.enqueue_job() with depencency""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job = Job.create(say_hello) + parent_job.save() + job = Job.create(say_hello, depends_on=parent_job) + q.enqueue_job(job) + w.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + q.enqueue_job(parent_job) + w.work(burst=True) + self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index b43f13b..5a84f71 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,11 +1,12 @@ from rq.cli.helpers import get_redis_from_config from tests import RQTestCase - +from unittest import mock class TestHelpers(RQTestCase): - def test_get_redis_from_config(self): + @mock.patch('rq.cli.helpers.Sentinel') + def test_get_redis_from_config(self, sentinel_class_mock): """Ensure Redis connection params are properly parsed""" settings = { 'REDIS_URL': 'redis://localhost:1/1' @@ -39,3 +40,46 @@ class TestHelpers(RQTestCase): self.assertEqual(connection_kwargs['db'], 2) self.assertEqual(connection_kwargs['port'], 2) self.assertEqual(connection_kwargs['password'], 'bar') + + # Add Sentinel to the settings + settings.update({ + 'SENTINEL': { + 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], + 'MASTER_NAME': 'master', + 'DB': 2, + 'USERNAME': 'redis-user', + 'PASSWORD': 'redis-secret', + 'SOCKET_TIMEOUT': None, + 'CONNECTION_KWARGS': { + 'ssl_ca_path': None, + }, + 'SENTINEL_KWARGS': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + }, + }, + }) + + # Ensure SENTINEL is preferred against REDIS_* parameters + redis = get_redis_from_config(settings) + sentinel_init_sentinels_args = sentinel_class_mock.call_args[0] + sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1] + self.assertEqual( + sentinel_init_sentinels_args, + ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],) + ) + self.assertDictEqual( + sentinel_init_sentinel_kwargs, + { + 'db': 2, + 'ssl': False, + 'username': 'redis-user', + 'password': 'redis-secret', + 'socket_timeout': None, + 'ssl_ca_path': None, + 'sentinel_kwargs': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + } + } + ) diff --git a/tests/test_job.py b/tests/test_job.py index 9d1ceae..23bbd11 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,4 +1,6 @@ import json + +from rq.defaults import CALLBACK_TIMEOUT from rq.serializers import JSONSerializer import time import queue @@ -9,7 +11,7 @@ from redis import WatchError from rq.utils import as_text from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError -from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job +from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback from rq.queue import Queue from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, @@ -209,9 +211,9 @@ class TestJob(RQTestCase): # ... and no other keys are stored self.assertEqual( - set(self.testconn.hkeys(job.key)), {b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', - b'worker_name', b'success_callback_name', b'failure_callback_name'} + b'worker_name', b'success_callback_name', b'failure_callback_name'}, + set(self.testconn.hkeys(job.key)) ) self.assertEqual(job.last_heartbeat, None) @@ -241,6 +243,31 @@ class TestJob(RQTestCase): self.assertEqual(stored_job.dependency.id, parent_job.id) self.assertEqual(stored_job.dependency, parent_job) + def test_persistence_of_callbacks(self): + """Storing jobs with success and/or failure callbacks.""" + job = Job.create(func=fixtures.some_calculation, + on_success=Callback(fixtures.say_hello, timeout=10), + on_failure=fixtures.say_pid) # deprecated callable + job.save() + stored_job = Job.fetch(job.id) + + self.assertEqual(fixtures.say_hello, stored_job.success_callback) + self.assertEqual(10, stored_job.success_callback_timeout) + self.assertEqual(fixtures.say_pid, stored_job.failure_callback) + self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout) + + # None(s) + job = Job.create(func=fixtures.some_calculation, + on_failure=None) + job.save() + stored_job = Job.fetch(job.id) + self.assertIsNone(stored_job.success_callback) + self.assertEqual(CALLBACK_TIMEOUT, job.success_callback_timeout) # timeout should be never none + self.assertEqual(CALLBACK_TIMEOUT, stored_job.success_callback_timeout) + self.assertIsNone(stored_job.failure_callback) + self.assertEqual(CALLBACK_TIMEOUT, job.failure_callback_timeout) # timeout should be never none + self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout) + def test_store_then_fetch(self): """Store, then fetch.""" job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), diff --git a/tests/test_registry.py b/tests/test_registry.py index 28a29ca..57584b5 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,9 +1,12 @@ from datetime import datetime, timedelta +from unittest import mock +from unittest.mock import PropertyMock, ANY + from rq.serializers import JSONSerializer from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL -from rq.exceptions import InvalidJobOperation +from rq.exceptions import InvalidJobOperation, AbandonedJobError from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue from rq.utils import current_timestamp @@ -161,7 +164,9 @@ class TestRegistry(RQTestCase): self.assertNotIn(job, failed_job_registry) self.assertIn(job, self.registry) - self.registry.cleanup() + with mock.patch.object(Job, 'execute_failure_callback') as mocked: + self.registry.cleanup() + mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY) self.assertIn(job.id, failed_job_registry) self.assertNotIn(job, self.registry) job.refresh() diff --git a/tests/test_results.py b/tests/test_results.py index 4f705f5..9bc1b9e 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -214,3 +214,25 @@ class TestScheduledJobRegistry(RQTestCase): job = queue.enqueue(div_by_zero) self.assertEqual(job.latest_result().type, Result.Type.FAILED) + + def test_job_return_value_result_ttl_infinity(self): + """Test job.return_value when queue.result_ttl=-1""" + queue = Queue(connection=self.connection, result_ttl=-1) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNone(job.return_value()) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=-1, return_value=1) + self.assertEqual(job.return_value(), 1) + + def test_job_return_value_result_ttl_zero(self): + """Test job.return_value when queue.result_ttl=0""" + queue = Queue(connection=self.connection, result_ttl=0) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNone(job.return_value()) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=0, return_value=1) + self.assertIsNone(job.return_value()) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 2872ee0..1f392a3 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -42,3 +42,10 @@ class TestTimeouts(RQTestCase): self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
+
+ # Test negative timeout doesn't raise JobTimeoutException,
+ # which implies an unintended immediate timeout.
+ job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=-1)
+ w.work(burst=True)
+ job.refresh()
+ self.assertIn(job, finished_job_registry)
diff --git a/tests/test_worker.py b/tests/test_worker.py index cfce473..dfa0f1d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -19,6 +19,7 @@ import pytest from unittest import mock from unittest.mock import Mock +from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from tests import RQTestCase, slow from tests.fixtures import ( access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing, @@ -607,6 +608,31 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + @slow + def test_max_idle_time(self): + q = Queue() + w = Worker([q]) + q.enqueue(say_hello, args=('Frank',)) + self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1)) + + # idle for 1 second + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1)) + + # idle for 3 seconds + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + + # idle for 2 seconds because idle_time is less than timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2)) + self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer + + # idle for 3 seconds because idle_time is less than two rounds of timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -639,7 +665,6 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) - # Put it on the queue with a timeout value self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self): @@ -936,7 +961,15 @@ class TestWorker(RQTestCase): worker.last_cleaned_at = utcnow() self.assertFalse(worker.should_run_maintenance_tasks) - worker.last_cleaned_at = utcnow() - timedelta(seconds=3700) + worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100) + self.assertTrue(worker.should_run_maintenance_tasks) + + # custom maintenance_interval + worker = Worker(queue, maintenance_interval=10) + self.assertTrue(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() - timedelta(seconds=11) self.assertTrue(worker.should_run_maintenance_tasks) def test_worker_calls_clean_registries(self): @@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase): worker = Worker.find_by_key(w2.key) self.assertEqual(worker.python_version, python_version) + def test_dequeue_random_strategy(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="random") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)] + expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)] + + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + expected_rr.reverse() + expected_ser.reverse() + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + sorted_ids.sort() + expected_ser.sort() + self.assertEqual(sorted_ids, expected_ser) + + def test_dequeue_round_robin(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="round_robin") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0', + 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1', + 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2'] + + self.assertEqual(expected, sorted_ids) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) @@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) - w.monitor_work_horse(job, queue) + with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked: + w.monitor_work_horse(job, queue) + self.assertEqual(mocked.call_count, 1) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor |