summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/lint.yml6
-rw-r--r--README.md2
-rw-r--r--rq/__init__.py3
-rwxr-xr-xrq/cli/cli.py270
-rw-r--r--rq/cli/helpers.py57
-rw-r--r--rq/command.py2
-rw-r--r--rq/connections.py17
-rw-r--r--rq/contrib/sentry.py1
-rw-r--r--rq/decorators.py49
-rw-r--r--rq/defaults.py1
-rw-r--r--rq/job.py259
-rw-r--r--rq/local.py8
-rw-r--r--rq/logutils.py13
-rw-r--r--rq/queue.py413
-rw-r--r--rq/registry.py63
-rw-r--r--rq/results.py59
-rw-r--r--rq/scheduler.py38
-rw-r--r--rq/serializers.py4
-rw-r--r--rq/timeouts.py20
-rw-r--r--rq/utils.py41
-rw-r--r--rq/worker.py337
21 files changed, 1006 insertions, 657 deletions
diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml
index ea767a7..c99348d 100644
--- a/.github/workflows/lint.yml
+++ b/.github/workflows/lint.yml
@@ -25,7 +25,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
- pip install flake8
+ pip install flake8 black
- name: Lint with flake8
run: |
@@ -33,3 +33,7 @@ jobs:
flake8 . --select=E9,F63,F7,F82 --show-source
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --exit-zero --max-complexity=5
+
+ - name: Lint with black
+ run: |
+ black -S -l 120 rq/
diff --git a/README.md b/README.md
index 40d314b..5fe80cd 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,8 @@ RQ requires Redis >= 3.0.0.
[![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)
[![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq)
+[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
+
Full documentation can be found [here][d].
diff --git a/rq/__init__.py b/rq/__init__.py
index abca97c..d5db681 100644
--- a/rq/__init__.py
+++ b/rq/__init__.py
@@ -1,7 +1,6 @@
# flake8: noqa
-from .connections import (Connection, get_current_connection, pop_connection,
- push_connection, use_connection)
+from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection
from .job import cancel_job, get_current_job, requeue_job, Retry
from .queue import Queue
from .version import VERSION
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 75c5974..f781010 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -10,22 +10,34 @@ import click
from redis.exceptions import ConnectionError
from rq import Connection, Retry, __version__ as version
-from rq.cli.helpers import (read_config_file, refresh,
- setup_loghandlers_from_args,
- show_both, show_queues, show_workers, CliConfig, parse_function_args,
- parse_schedule)
+from rq.cli.helpers import (
+ read_config_file,
+ refresh,
+ setup_loghandlers_from_args,
+ show_both,
+ show_queues,
+ show_workers,
+ CliConfig,
+ parse_function_args,
+ parse_schedule,
+)
from rq.contrib.legacy import cleanup_ghosts
-from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
- DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS,
- DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL,
- DEFAULT_JOB_MONITORING_INTERVAL,
- DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT,
- DEFAULT_SERIALIZER_CLASS)
+from rq.defaults import (
+ DEFAULT_CONNECTION_CLASS,
+ DEFAULT_JOB_CLASS,
+ DEFAULT_QUEUE_CLASS,
+ DEFAULT_WORKER_CLASS,
+ DEFAULT_RESULT_TTL,
+ DEFAULT_WORKER_TTL,
+ DEFAULT_JOB_MONITORING_INTERVAL,
+ DEFAULT_LOGGING_FORMAT,
+ DEFAULT_LOGGING_DATE_FORMAT,
+ DEFAULT_SERIALIZER_CLASS,
+)
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
from rq.utils import import_attribute, get_call_string, make_colorizer
-from rq.suspension import (suspend as connection_suspend,
- resume as connection_resume, is_suspended)
+from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from rq.worker_registration import clean_worker_registry
from rq.job import JobStatus
@@ -39,35 +51,26 @@ click.disable_unicode_literals_warning = True
shared_options = [
- click.option('--url', '-u',
- envvar='RQ_REDIS_URL',
- help='URL describing Redis connection details.'),
- click.option('--config', '-c',
- envvar='RQ_CONFIG',
- help='Module containing RQ settings.'),
- click.option('--worker-class', '-w',
- envvar='RQ_WORKER_CLASS',
- default=DEFAULT_WORKER_CLASS,
- help='RQ Worker class to use'),
- click.option('--job-class', '-j',
- envvar='RQ_JOB_CLASS',
- default=DEFAULT_JOB_CLASS,
- help='RQ Job class to use'),
- click.option('--queue-class',
- envvar='RQ_QUEUE_CLASS',
- default=DEFAULT_QUEUE_CLASS,
- help='RQ Queue class to use'),
- click.option('--connection-class',
- envvar='RQ_CONNECTION_CLASS',
- default=DEFAULT_CONNECTION_CLASS,
- help='Redis client class to use'),
- click.option('--path', '-P',
- default=['.'],
- help='Specify the import path.',
- multiple=True),
- click.option('--serializer', '-S',
- default=DEFAULT_SERIALIZER_CLASS,
- help='Path to serializer, defaults to rq.serializers.DefaultSerializer')
+ click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
+ click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
+ click.option(
+ '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
+ ),
+ click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
+ click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
+ click.option(
+ '--connection-class',
+ envvar='RQ_CONNECTION_CLASS',
+ default=DEFAULT_CONNECTION_CLASS,
+ help='Redis client class to use',
+ ),
+ click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
+ click.option(
+ '--serializer',
+ '-S',
+ default=DEFAULT_SERIALIZER_CLASS,
+ help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
+ ),
]
@@ -100,15 +103,16 @@ def empty(cli_config, all, queues, serializer, **options):
"""Empty given queues."""
if all:
- queues = cli_config.queue_class.all(connection=cli_config.connection,
- job_class=cli_config.job_class,
- serializer=serializer)
+ queues = cli_config.queue_class.all(
+ connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
+ )
else:
- queues = [cli_config.queue_class(queue,
- connection=cli_config.connection,
- job_class=cli_config.job_class,
- serializer=serializer)
- for queue in queues]
+ queues = [
+ cli_config.queue_class(
+ queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
+ )
+ for queue in queues
+ ]
if not queues:
click.echo('Nothing to do')
@@ -127,10 +131,9 @@ def empty(cli_config, all, queues, serializer, **options):
def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
"""Requeue failed jobs."""
- failed_job_registry = FailedJobRegistry(queue,
- connection=cli_config.connection,
- job_class=job_class,
- serializer=serializer)
+ failed_job_registry = FailedJobRegistry(
+ queue, connection=cli_config.connection, job_class=job_class, serializer=serializer
+ )
if all:
job_ids = failed_job_registry.get_job_ids()
@@ -159,8 +162,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1)
@pass_cli_config
-def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
- **options):
+def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, **options):
"""RQ command-line monitor."""
if only_queues:
@@ -182,8 +184,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
clean_registries(queue)
clean_worker_registry(queue)
- refresh(interval, func, qs, raw, by_queue,
- cli_config.queue_class, cli_config.worker_class)
+ refresh(interval, func, qs, raw, by_queue, cli_config.queue_class, cli_config.worker_class)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
@@ -200,7 +201,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used')
-@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used')
+@click.option(
+ '--job-monitoring-interval',
+ type=int,
+ default=DEFAULT_JOB_MONITORING_INTERVAL,
+ help='Default job monitoring interval to be used',
+)
@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@@ -215,11 +221,31 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1)
@pass_cli_config
-def worker(cli_config, burst, logging_level, name, results_ttl,
- worker_ttl, job_monitoring_interval, disable_job_desc_logging,
- verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn,
- exception_handler, pid, disable_default_exception_handler, max_jobs,
- with_scheduler, queues, log_format, date_format, serializer, **options):
+def worker(
+ cli_config,
+ burst,
+ logging_level,
+ name,
+ results_ttl,
+ worker_ttl,
+ job_monitoring_interval,
+ disable_job_desc_logging,
+ verbose,
+ quiet,
+ sentry_ca_certs,
+ sentry_debug,
+ sentry_dsn,
+ exception_handler,
+ pid,
+ disable_default_exception_handler,
+ max_jobs,
+ with_scheduler,
+ queues,
+ log_format,
+ date_format,
+ serializer,
+ **options
+):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
@@ -245,38 +271,46 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)
- queues = [cli_config.queue_class(queue,
- connection=cli_config.connection,
- job_class=cli_config.job_class,
- serializer=serializer)
- for queue in queues]
+ queues = [
+ cli_config.queue_class(
+ queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
+ )
+ for queue in queues
+ ]
worker = cli_config.worker_class(
- queues, name=name, connection=cli_config.connection,
- default_worker_ttl=worker_ttl, default_result_ttl=results_ttl,
+ queues,
+ name=name,
+ connection=cli_config.connection,
+ default_worker_ttl=worker_ttl,
+ default_result_ttl=results_ttl,
job_monitoring_interval=job_monitoring_interval,
- job_class=cli_config.job_class, queue_class=cli_config.queue_class,
+ job_class=cli_config.job_class,
+ queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging,
- serializer=serializer
+ serializer=serializer,
)
# Should we configure Sentry?
if sentry_dsn:
- sentry_opts = {
- "ca_certs": sentry_ca_certs,
- "debug": sentry_debug
- }
+ sentry_opts = {"ca_certs": sentry_ca_certs, "debug": sentry_debug}
from rq.contrib.sentry import register_sentry
+
register_sentry(sentry_dsn, **sentry_opts)
# if --verbose or --quiet, override --logging_level
if verbose or quiet:
logging_level = None
- worker.work(burst=burst, logging_level=logging_level,
- date_format=date_format, log_format=log_format,
- max_jobs=max_jobs, with_scheduler=with_scheduler)
+ worker.work(
+ burst=burst,
+ logging_level=logging_level,
+ date_format=date_format,
+ log_format=log_format,
+ max_jobs=max_jobs,
+ with_scheduler=with_scheduler,
+ )
except ConnectionError as e:
print(e)
sys.exit(1)
@@ -296,7 +330,9 @@ def suspend(cli_config, duration, **options):
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
- automatically resume""".format(duration)
+ automatically resume""".format(
+ duration
+ )
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
@@ -312,27 +348,51 @@ def resume(cli_config, **options):
@main.command()
@click.option('--queue', '-q', help='The name of the queue.', default='default')
-@click.option('--timeout',
- help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.')
+@click.option(
+ '--timeout', help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.'
+)
@click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.')
@click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.')
@click.option('--failure-ttl', help='Specifies how long failed jobs are kept.')
@click.option('--description', help='Additional description of the job')
-@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.',
- multiple=True)
+@click.option(
+ '--depends-on', help='Specifies another job id that must complete before this job will be queued.', multiple=True
+)
@click.option('--job-id', help='The id of this job')
@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end')
@click.option('--retry-max', help='Maximum amount of retries', default=0, type=int)
@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0])
@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).')
-@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
- 'timezone (e.g. 2021-05-27T21:45:00).')
+@click.option(
+ '--schedule-at',
+ help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
+ 'timezone (e.g. 2021-05-27T21:45:00).',
+)
@click.option('--quiet', is_flag=True, help='Only logs errors.')
@click.argument('function')
@click.argument('arguments', nargs=-1)
@pass_cli_config
-def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front,
- retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options):
+def enqueue(
+ cli_config,
+ queue,
+ timeout,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ description,
+ depends_on,
+ job_id,
+ at_front,
+ retry_max,
+ retry_interval,
+ schedule_in,
+ schedule_at,
+ quiet,
+ serializer,
+ function,
+ arguments,
+ **options
+):
"""Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments)
function_string = get_call_string(function, args, kwargs)
@@ -348,11 +408,37 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio
queue = cli_config.queue_class(queue, serializer=serializer)
if schedule is None:
- job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl,
- description, depends_on, job_id, at_front, None, retry)
+ job = queue.enqueue_call(
+ function,
+ args,
+ kwargs,
+ timeout,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ description,
+ depends_on,
+ job_id,
+ at_front,
+ None,
+ retry,
+ )
else:
- job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl,
- description, depends_on, job_id, None, JobStatus.SCHEDULED, retry)
+ job = queue.create_job(
+ function,
+ args,
+ kwargs,
+ timeout,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ description,
+ depends_on,
+ job_id,
+ None,
+ JobStatus.SCHEDULED,
+ retry,
+ )
queue.schedule_job(job, schedule)
if not quiet:
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index af812ce..fb20109 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -13,8 +13,7 @@ from shutil import get_terminal_size
import click
from redis import Redis
from redis.sentinel import Sentinel
-from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
- DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
+from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS
from rq.logutils import setup_loghandlers
from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus
@@ -27,16 +26,14 @@ yellow = partial(click.style, fg='yellow')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
- return dict([(k, v)
- for k, v in settings.__dict__.items()
- if k.upper() == k])
+ return dict([(k, v) for k, v in settings.__dict__.items() if k.upper() == k])
def get_redis_from_config(settings, connection_class=Redis):
"""Returns a StrictRedis instance from a dictionary of settings.
- To use redis sentinel, you must specify a dictionary in the configuration file.
- Example of a dictionary with keys without values:
- SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':}
+ To use redis sentinel, you must specify a dictionary in the configuration file.
+ Example of a dictionary with keys without values:
+ SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':}
"""
if settings.get('REDIS_URL') is not None:
return connection_class.from_url(settings['REDIS_URL'])
@@ -50,8 +47,7 @@ def get_redis_from_config(settings, connection_class=Redis):
ssl = settings['SENTINEL'].get('SSL', False)
arguments = {'password': password, 'ssl': ssl}
sn = Sentinel(
- instances, socket_timeout=socket_timeout, password=password,
- db=db, ssl=ssl, sentinel_kwargs=arguments
+ instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments
)
return sn.master_for(master_name)
@@ -168,9 +164,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
for queue in queue_dict:
if queue_dict[queue]:
queues_str = ", ".join(
- sorted(
- map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])
- )
+ sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]))
)
else:
queues_str = '–'
@@ -188,6 +182,7 @@ def show_both(queues, raw, by_queue, queue_class, worker_class):
if not raw:
click.echo('')
import datetime
+
click.echo('Updated: %s' % datetime.datetime.now())
@@ -233,14 +228,14 @@ def parse_function_arg(argument, arg_pos):
if index > 0:
if ':' in argument and argument.index(':') + 1 == index: # keyword, json
mode = ParsingMode.JSON
- keyword = argument[:index - 1]
+ keyword = argument[: index - 1]
elif '%' in argument and argument.index('%') + 1 == index: # keyword, literal_eval
mode = ParsingMode.LITERAL_EVAL
- keyword = argument[:index - 1]
+ keyword = argument[: index - 1]
else: # keyword, text
mode = ParsingMode.PLAIN_TEXT
keyword = argument[:index]
- value = argument[index + 1:]
+ value = argument[index + 1 :]
else: # no keyword, text
mode = ParsingMode.PLAIN_TEXT
value = argument
@@ -261,9 +256,11 @@ def parse_function_arg(argument, arg_pos):
try:
value = literal_eval(value)
except Exception:
- raise click.BadParameter('Unable to eval %s as Python object. See '
- 'https://docs.python.org/3/library/ast.html#ast.literal_eval'
- % (keyword or '%s. non keyword argument' % arg_pos))
+ raise click.BadParameter(
+ 'Unable to eval %s as Python object. See '
+ 'https://docs.python.org/3/library/ast.html#ast.literal_eval'
+ % (keyword or '%s. non keyword argument' % arg_pos)
+ )
return keyword, value
@@ -294,9 +291,19 @@ def parse_schedule(schedule_in, schedule_at):
class CliConfig:
"""A helper class to be used with click commands, to handle shared options"""
- def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS,
- job_class=DEFAULT_JOB_CLASS, queue_class=DEFAULT_QUEUE_CLASS,
- connection_class=DEFAULT_CONNECTION_CLASS, path=None, *args, **kwargs):
+
+ def __init__(
+ self,
+ url=None,
+ config=None,
+ worker_class=DEFAULT_WORKER_CLASS,
+ job_class=DEFAULT_JOB_CLASS,
+ queue_class=DEFAULT_QUEUE_CLASS,
+ connection_class=DEFAULT_CONNECTION_CLASS,
+ path=None,
+ *args,
+ **kwargs
+ ):
self._connection = None
self.url = url
self.config = config
@@ -331,9 +338,7 @@ class CliConfig:
self._connection = self.connection_class.from_url(self.url)
elif self.config:
settings = read_config_file(self.config) if self.config else {}
- self._connection = get_redis_from_config(settings,
- self.connection_class)
+ self._connection = get_redis_from_config(settings, self.connection_class)
else:
- self._connection = get_redis_from_config(os.environ,
- self.connection_class)
+ self._connection = get_redis_from_config(os.environ, self.connection_class)
return self._connection
diff --git a/rq/command.py b/rq/command.py
index 0f8ef6e..4566ec0 100644
--- a/rq/command.py
+++ b/rq/command.py
@@ -22,7 +22,7 @@ def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs):
- `shutdown`: Shuts down a worker
- `kill-horse`: Command for the worker to kill the current working horse
- `stop-job`: A command for the worker to stop the currently running job
-
+
The command string will be parsed into a dictionary and send to a PubSub Topic.
Workers listen to the PubSub, and `handle` the specific command.
diff --git a/rq/connections.py b/rq/connections.py
index c5ebc20..413ee5a 100644
--- a/rq/connections.py
+++ b/rq/connections.py
@@ -30,8 +30,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa
Args:
connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
"""
- warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.",
- DeprecationWarning)
+ warnings.warn(
+ "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning
+ )
if connection is None:
connection = Redis()
push_connection(connection)
@@ -39,9 +40,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa
yield
finally:
popped = pop_connection()
- assert popped == connection, \
- 'Unexpected Redis connection was popped off the stack. ' \
- 'Check your Redis connection setup.'
+ assert popped == connection, (
+ 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.'
+ )
def push_connection(redis: 'Redis'):
@@ -72,8 +73,7 @@ def use_connection(redis: Optional['Redis'] = None):
Args:
redis (Optional[Redis], optional): A Redis Connection. Defaults to None.
"""
- assert len(_connection_stack) <= 1, \
- 'You should not mix Connection contexts with use_connection()'
+ assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()'
release_local(_connection_stack)
if redis is None:
@@ -118,5 +118,4 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
_connection_stack = LocalStack()
-__all__ = ['Connection', 'get_current_connection', 'push_connection',
- 'pop_connection', 'use_connection']
+__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection']
diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py
index edf0fe4..efb55b2 100644
--- a/rq/contrib/sentry.py
+++ b/rq/contrib/sentry.py
@@ -4,4 +4,5 @@ def register_sentry(sentry_dsn, **opts):
"""
import sentry_sdk
from sentry_sdk.integrations.rq import RqIntegration
+
sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()], **opts)
diff --git a/rq/decorators.py b/rq/decorators.py
index 70a61aa..2bf46e8 100644
--- a/rq/decorators.py
+++ b/rq/decorators.py
@@ -13,12 +13,23 @@ from .utils import backend_class
class job: # noqa
queue_class = Queue
- def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None,
- result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None,
- queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None,
- meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None,
- retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None,
- on_success: Optional[Callable[..., Any]] = None):
+ def __init__(
+ self,
+ queue: Union['Queue', str],
+ connection: Optional['Redis'] = None,
+ timeout: Optional[int] = None,
+ result_ttl: int = DEFAULT_RESULT_TTL,
+ ttl: Optional[int] = None,
+ queue_class: Optional['Queue'] = None,
+ depends_on: Optional[List[Any]] = None,
+ at_front: Optional[bool] = None,
+ meta: Optional[Dict[Any, Any]] = None,
+ description: Optional[str] = None,
+ failure_ttl: Optional[int] = None,
+ retry: Optional['Retry'] = None,
+ on_failure: Optional[Callable[..., Any]] = None,
+ on_success: Optional[Callable[..., Any]] = None,
+ ):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
@@ -68,8 +79,7 @@ class job: # noqa
@wraps(f)
def delay(*args, **kwargs):
if isinstance(self.queue, str):
- queue = self.queue_class(name=self.queue,
- connection=self.connection)
+ queue = self.queue_class(name=self.queue, connection=self.connection)
else:
queue = self.queue
@@ -83,10 +93,23 @@ class job: # noqa
if not at_front:
at_front = self.at_front
- return queue.enqueue_call(f, args=args, kwargs=kwargs,
- timeout=self.timeout, result_ttl=self.result_ttl,
- ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front,
- meta=self.meta, description=self.description, failure_ttl=self.failure_ttl,
- retry=self.retry, on_failure=self.on_failure, on_success=self.on_success)
+ return queue.enqueue_call(
+ f,
+ args=args,
+ kwargs=kwargs,
+ timeout=self.timeout,
+ result_ttl=self.result_ttl,
+ ttl=self.ttl,
+ depends_on=depends_on,
+ job_id=job_id,
+ at_front=at_front,
+ meta=self.meta,
+ description=self.description,
+ failure_ttl=self.failure_ttl,
+ retry=self.retry,
+ on_failure=self.on_failure,
+ on_success=self.on_success,
+ )
+
f.delay = delay
return f
diff --git a/rq/defaults.py b/rq/defaults.py
index ef76678..bd50489 100644
--- a/rq/defaults.py
+++ b/rq/defaults.py
@@ -88,4 +88,3 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
Uses Python's default attributes as defined
https://docs.python.org/3/library/logging.html#logrecord-attributes
"""
-
diff --git a/rq/job.py b/rq/job.py
index 9104747..61b359f 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -8,8 +8,7 @@ from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from enum import Enum
from redis import WatchError
-from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List,
- Optional, Tuple, Union)
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from uuid import uuid4
@@ -20,18 +19,27 @@ if TYPE_CHECKING:
from redis.client import Pipeline
from .connections import resolve_connection
-from .exceptions import (DeserializationError, InvalidJobOperation,
- NoSuchJobError)
+from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from .local import LocalStack
from .serializers import resolve_serializer
from .types import FunctionReferenceType, JobDependencyType
-from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string,
- get_version, import_attribute, parse_timeout, str_to_date,
- utcformat, utcnow)
+from .utils import (
+ as_text,
+ decode_redis_hash,
+ ensure_list,
+ get_call_string,
+ get_version,
+ import_attribute,
+ parse_timeout,
+ str_to_date,
+ utcformat,
+ utcnow,
+)
class JobStatus(str, Enum):
- """The Status of Job within its lifecycle at any given time. """
+ """The Status of Job within its lifecycle at any given time."""
+
QUEUED = 'queued'
FINISHED = 'finished'
FAILED = 'failed'
@@ -57,13 +65,9 @@ class Dependency:
Raises:
ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty
- """
+ """
dependent_jobs = ensure_list(jobs)
- if not all(
- isinstance(job, Job) or isinstance(job, str)
- for job in dependent_jobs
- if job
- ):
+ if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job):
raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
elif len(dependent_jobs) < 1:
raise ValueError("jobs: cannot be empty.")
@@ -79,8 +83,7 @@ yet been evaluated.
"""
-def cancel_job(job_id: str, connection: Optional['Redis'] = None,
- serializer=None, enqueue_dependents: bool = False):
+def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
"""Cancels the job with the given job ID, preventing execution.
Use with caution. This will discard any job info (i.e. it can't be requeued later).
@@ -103,13 +106,11 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J
Returns:
job (Optional[Job]): The current Job running
- """
+ """
if connection:
- warnings.warn("connection argument for get_current_job is deprecated.",
- DeprecationWarning)
+ warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning)
if job_class:
- warnings.warn("job_class argument for get_current_job is deprecated.",
- DeprecationWarning)
+ warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning)
return _job_stack.top
@@ -123,26 +124,38 @@ def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job':
Returns:
Job: The requeued Job object.
- """
+ """
job = Job.fetch(job_id, connection=connection, serializer=serializer)
return job.requeue()
class Job:
"""A Job is just a convenient datastructure to pass around job (meta) data."""
+
redis_job_namespace_prefix = 'rq:job:'
@classmethod
- def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None,
- kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None,
- result_ttl: Optional[int] = None, ttl: Optional[int] = None,
- status: Optional[JobStatus] = None, description: Optional[str] =None,
- depends_on: Optional[JobDependencyType] = None,
- timeout: Optional[int] = None, id: Optional[str] = None,
- origin=None, meta: Optional[Dict[str, Any]] = None,
- failure_ttl: Optional[int] = None, serializer=None, *,
- on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None) -> 'Job':
+ def create(
+ cls,
+ func: FunctionReferenceType,
+ args: Union[List[Any], Optional[Tuple]] = None,
+ kwargs: Optional[Dict[str, Any]] = None,
+ connection: Optional['Redis'] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ status: Optional[JobStatus] = None,
+ description: Optional[str] = None,
+ depends_on: Optional[JobDependencyType] = None,
+ timeout: Optional[int] = None,
+ id: Optional[str] = None,
+ origin=None,
+ meta: Optional[Dict[str, Any]] = None,
+ failure_ttl: Optional[int] = None,
+ serializer=None,
+ *,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None
+ ) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@@ -185,7 +198,7 @@ class Job:
Returns:
Job: A job instance.
- """
+ """
if args is None:
args = ()
if kwargs is None:
@@ -247,10 +260,7 @@ class Job:
depends_on_list = depends_on.dependencies
else:
depends_on_list = ensure_list(depends_on)
- job._dependency_ids = [
- dep.id if isinstance(dep, Job) else dep
- for dep in depends_on_list
- ]
+ job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list]
return job
@@ -259,8 +269,9 @@ class Job:
Returns:
position (Optional[int]): The position
- """
+ """
from .queue import Queue
+
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
return q.get_job_position(self._id)
@@ -274,7 +285,7 @@ class Job:
Returns:
status (JobStatus): The Job Status
- """
+ """
if refresh:
self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status
@@ -285,7 +296,7 @@ class Job:
Args:
status (JobStatus): The Job Status to be set
pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None.
- """
+ """
self._status = status
connection: 'Redis' = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
@@ -298,7 +309,7 @@ class Job:
Returns:
meta (Dict): The dictionary of metadata
- """
+ """
if refresh:
meta = self.connection.hget(self.key, 'meta')
self.meta = self.serializer.loads(meta) if meta else {}
@@ -401,7 +412,7 @@ class Job:
Raises:
DeserializationError: Cathes any deserialization error (since serializers are generic)
- """
+ """
try:
self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data)
except Exception as e:
@@ -522,7 +533,7 @@ class Job:
job_ids (Iterable[str]): A list of job ids.
connection (Redis): Redis connection
serializer (Callable): A serializer
-
+
Returns:
jobs (list[Job]): A list of Jobs instances.
"""
@@ -580,17 +591,14 @@ class Job:
self.enqueue_at_front: Optional[bool] = None
from .results import Result
+
self._cached_result: Optional[Result] = None
def __repr__(self): # noqa # pragma: no cover
- return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
- self._id,
- self.enqueued_at)
+ return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, self._id, self.enqueued_at)
def __str__(self):
- return '<{0} {1}: {2}>'.format(self.__class__.__name__,
- self.id,
- self.description)
+ return '<{0} {1}: {2}>'.format(self.__class__.__name__, self.id, self.description)
def __eq__(self, other): # noqa
return isinstance(other, self.__class__) and self.id == other.id
@@ -612,7 +620,7 @@ class Job:
def set_id(self, value: str) -> None:
"""Sets a job ID for the given job
-
+
Args:
value (str): The value to set as Job ID
"""
@@ -630,7 +638,7 @@ class Job:
ttl (int): The time to live
pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None.
xx (bool, optional): Only sets the key if already exists. Defaults to False.
- """
+ """
self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
@@ -690,14 +698,15 @@ class Job:
Returns:
jobs (list[Job]): A list of Jobs
- """
+ """
connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids:
- connection.watch(*[self.key_for(dependency_id)
- for dependency_id in self._dependency_ids])
+ connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids])
- dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer)
+ dependencies_list = self.fetch_many(
+ self._dependency_ids, connection=self.connection, serializer=self.serializer
+ )
jobs = [job for job in dependencies_list if job]
return jobs
@@ -706,8 +715,7 @@ class Job:
"""
Get the latest result and returns `exc_info` only if the latest result is a failure.
"""
- warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.",
- DeprecationWarning)
+ warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning)
from .results import Result
@@ -730,6 +738,7 @@ class Job:
result (Optional[Any]): The job return value.
"""
from .results import Result
+
if refresh:
self._cached_result = None
@@ -741,7 +750,7 @@ class Job:
if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL:
return self._cached_result.return_value
-
+
return None
@property
@@ -762,8 +771,7 @@ class Job:
seconds by default).
"""
- warnings.warn("job.result is deprecated, use job.return_value instead.",
- DeprecationWarning)
+ warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning)
from .results import Result
@@ -789,6 +797,7 @@ class Job:
all_results (List[Result]): A list of 'Result' objects
"""
from .results import Result
+
return Result.all(self, serializer=self.serializer)
def latest_result(self) -> Optional['Result']:
@@ -796,9 +805,10 @@ class Job:
Returns:
result (Result): The Result object
- """
+ """
"""Returns the latest Result object"""
from .results import Result
+
return Result.fetch_latest(self, serializer=self.serializer)
def restore(self, raw_data) -> Any:
@@ -849,8 +859,7 @@ class Job:
dep_ids = obj.get('dependency_ids')
dep_id = obj.get('dependency_id') # for backwards compatibility
- self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
- else [dep_id.decode()] if dep_id else [])
+ self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []
allow_failures = obj.get('allow_dependency_failures')
self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None
self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None
@@ -902,7 +911,7 @@ class Job:
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
- 'worker_name': self.worker_name or ''
+ 'worker_name': self.worker_name or '',
}
if self.retries_left is not None:
@@ -948,8 +957,7 @@ class Job:
return obj
- def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True,
- include_result: bool = True):
+ def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, include_result: bool = True):
"""Dumps the current job instance to its corresponding Redis key.
Exclude saving the `meta` dictionary by setting
@@ -980,7 +988,7 @@ class Job:
def get_redis_server_version(self) -> Tuple[int, int, int]:
"""Return Redis server version of connection
-
+
Returns:
redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9)
"""
@@ -1016,15 +1024,13 @@ class Job:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
from .registry import CanceledJobRegistry
from .queue import Queue
+
pipe = pipeline or self.connection.pipeline()
while True:
try:
q = Queue(
- name=self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer
+ name=self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
self.set_status(JobStatus.CANCELED, pipeline=pipe)
@@ -1033,16 +1039,10 @@ class Job:
if pipeline is None:
pipe.watch(self.dependents_key)
q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id)
- self._remove_from_registries(
- pipeline=pipe,
- remove_from_queue=True
- )
+ self._remove_from_registries(pipeline=pipe, remove_from_queue=True)
registry = CanceledJobRegistry(
- self.origin,
- self.connection,
- job_class=self.__class__,
- serializer=self.serializer
+ self.origin, self.connection, job_class=self.__class__, serializer=self.serializer
)
registry.add(self, pipeline=pipe)
if pipeline is None:
@@ -1070,41 +1070,43 @@ class Job:
def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
from .registry import BaseRegistry
+
if remove_from_queue:
from .queue import Queue
+
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline)
registry: BaseRegistry
if self.is_finished:
from .registry import FinishedJobRegistry
- registry = FinishedJobRegistry(self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ registry = FinishedJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.remove(self, pipeline=pipeline)
elif self.is_deferred:
from .registry import DeferredJobRegistry
- registry = DeferredJobRegistry(self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ registry = DeferredJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.remove(self, pipeline=pipeline)
elif self.is_started:
from .registry import StartedJobRegistry
- registry = StartedJobRegistry(self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ registry = StartedJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.remove(self, pipeline=pipeline)
elif self.is_scheduled:
from .registry import ScheduledJobRegistry
- registry = ScheduledJobRegistry(self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ registry = ScheduledJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.remove(self, pipeline=pipeline)
elif self.is_failed or self.is_stopped:
@@ -1112,13 +1114,15 @@ class Job:
elif self.is_canceled:
from .registry import CanceledJobRegistry
- registry = CanceledJobRegistry(self.origin, connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ registry = CanceledJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.remove(self, pipeline=pipeline)
- def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True,
- delete_dependents: bool = False):
+ def delete(
+ self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents: bool = False
+ ):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well.
@@ -1184,7 +1188,7 @@ class Job:
'last_heartbeat': utcformat(self.last_heartbeat),
'status': self._status,
'started_at': utcformat(self.started_at), # type: ignore
- 'worker_name': worker_name
+ 'worker_name': worker_name,
}
if self.get_redis_server_version() >= (4, 0, 0):
pipeline.hset(self.key, mapping=mapping)
@@ -1199,7 +1203,7 @@ class Job:
Returns:
result (Any): The function result
- """
+ """
result = self.func(*self.args, **self.kwargs)
if asyncio.iscoroutine(result):
loop = asyncio.new_event_loop()
@@ -1214,7 +1218,7 @@ class Job:
Args:
default_ttl (Optional[int]): The default time to live for the job
-
+
Returns:
ttl (int): The time to live
"""
@@ -1227,7 +1231,7 @@ class Job:
Args:
default_ttl (Optional[int]): The default time to live for the job result
-
+
Returns:
ttl (int): The time to live for the result
"""
@@ -1244,8 +1248,7 @@ class Job:
call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
return call_repr
- def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None,
- remove_from_queue: bool = True):
+ def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
"""Prepare job for eventual deletion (if needed).
This method is usually called after successful execution.
How long we persist the job and its result depends on the value of ttl:
@@ -1271,16 +1274,18 @@ class Job:
@property
def started_job_registry(self):
from .registry import StartedJobRegistry
- return StartedJobRegistry(self.origin, connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ return StartedJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
@property
def failed_job_registry(self):
from .registry import FailedJobRegistry
- return FailedJobRegistry(self.origin, connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+
+ return FailedJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
def get_retry_interval(self) -> int:
"""Returns the desired retry interval.
@@ -1332,10 +1337,9 @@ class Job:
"""
from .registry import DeferredJobRegistry
- registry = DeferredJobRegistry(self.origin,
- connection=self.connection,
- job_class=self.__class__,
- serializer=self.serializer)
+ registry = DeferredJobRegistry(
+ self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
+ )
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
@@ -1348,11 +1352,14 @@ class Job:
@property
def dependency_ids(self) -> List[bytes]:
dependencies = self.connection.smembers(self.dependencies_key)
- return [Job.key_for(_id.decode())
- for _id in dependencies]
-
- def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None,
- exclude_job_id: Optional[str] = None) -> bool:
+ return [Job.key_for(_id.decode()) for _id in dependencies]
+
+ def dependencies_are_met(
+ self,
+ parent_job: Optional['Job'] = None,
+ pipeline: Optional['Pipeline'] = None,
+ exclude_job_id: Optional[str] = None,
+ ) -> bool:
"""Returns a boolean indicating if all of this job's dependencies are `FINISHED`
If a pipeline is passed, all dependencies are WATCHed.
@@ -1373,8 +1380,7 @@ class Job:
connection = pipeline if pipeline is not None else self.connection
if pipeline is not None:
- connection.watch(*[self.key_for(dependency_id)
- for dependency_id in self._dependency_ids])
+ connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids])
dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)}
@@ -1407,12 +1413,7 @@ class Job:
if self.allow_dependency_failures:
allowed_statuses.append(JobStatus.FAILED)
- return all(
- status.decode() in allowed_statuses
- for status
- in dependencies_statuses
- if status
- )
+ return all(status.decode() in allowed_statuses for status in dependencies_statuses if status)
_job_stack = LocalStack()
diff --git a/rq/local.py b/rq/local.py
index 458cd83..8e94457 100644
--- a/rq/local.py
+++ b/rq/local.py
@@ -122,6 +122,7 @@ class LocalStack:
def _set__ident_func__(self, value): # noqa
object.__setattr__(self._local, '__ident_func__', value)
+
__ident_func__ = property(_get__ident_func__, _set__ident_func__)
del _get__ident_func__, _set__ident_func__
@@ -131,6 +132,7 @@ class LocalStack:
if rv is None:
raise RuntimeError('object unbound')
return rv
+
return LocalProxy(_lookup)
def push(self, obj):
@@ -223,10 +225,7 @@ class LocalManager:
release_local(local)
def __repr__(self):
- return '<%s storages: %d>' % (
- self.__class__.__name__,
- len(self.locals)
- )
+ return '<%s storages: %d>' % (self.__class__.__name__, len(self.locals))
class LocalProxy:
@@ -264,6 +263,7 @@ class LocalProxy:
.. versionchanged:: 0.6.1
The class can be instanciated with a callable as well now.
"""
+
__slots__ = ('__local', '__dict__', '__name__')
def __init__(self, local, name=None):
diff --git a/rq/logutils.py b/rq/logutils.py
index 36a404d..33e0949 100644
--- a/rq/logutils.py
+++ b/rq/logutils.py
@@ -3,12 +3,15 @@ import sys
from typing import Union
from rq.utils import ColorizingStreamHandler
-from rq.defaults import (DEFAULT_LOGGING_FORMAT,
- DEFAULT_LOGGING_DATE_FORMAT)
+from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT
-def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
- log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'):
+def setup_loghandlers(
+ level: Union[int, str, None] = None,
+ date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
+ log_format: str = DEFAULT_LOGGING_FORMAT,
+ name: str = 'rq.worker',
+):
"""Sets up a log handler.
Args:
@@ -17,7 +20,7 @@ def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DE
date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S').
log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s').
name (str, optional): The looger name. Defaults to 'rq.worker'.
- """
+ """
logger = logging.getLogger(name)
if not _has_effective_handler(logger):
diff --git a/rq/queue.py b/rq/queue.py
index 79125ce..bc7f070 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -30,11 +30,27 @@ blue = make_colorizer('darkblue')
logger = logging.getLogger("rq.queue")
-
-class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout",
- "result_ttl", "ttl", "failure_ttl",
- "description", "job_id",
- "at_front", "meta", "retry", "on_success", "on_failure"])):
+class EnqueueData(
+ namedtuple(
+ 'EnqueueData',
+ [
+ "func",
+ "args",
+ "kwargs",
+ "timeout",
+ "result_ttl",
+ "ttl",
+ "failure_ttl",
+ "description",
+ "job_id",
+ "at_front",
+ "meta",
+ "retry",
+ "on_success",
+ "on_failure",
+ ],
+ )
+):
"""Helper type to use when calling enqueue_many
NOTE: Does not support `depends_on` yet.
"""
@@ -50,7 +66,9 @@ class Queue:
redis_queues_keys: str = 'rq:queues'
@classmethod
- def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']:
+ def all(
+ cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
+ ) -> List['Queue']:
"""Returns an iterable of all Queues.
Args:
@@ -64,17 +82,22 @@ class Queue:
connection = resolve_connection(connection)
def to_queue(queue_key):
- return cls.from_queue_key(as_text(queue_key),
- connection=connection,
- job_class=job_class, serializer=serializer)
+ return cls.from_queue_key(
+ as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer
+ )
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key]
return all_queues
@classmethod
- def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue':
+ def from_queue_key(
+ cls,
+ queue_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Optional['Job'] = None,
+ serializer: Any = None,
+ ) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys.
@@ -94,11 +117,19 @@ class Queue:
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key))
- name = queue_key[len(prefix):]
+ name = queue_key[len(prefix) :]
return cls(name, connection=connection, job_class=job_class, serializer=serializer)
- def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None,
- is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs):
+ def __init__(
+ self,
+ name: str = 'default',
+ default_timeout: Optional[int] = None,
+ connection: Optional['Redis'] = None,
+ is_async: bool = True,
+ job_class: Union[str, Type['Job'], None] = None,
+ serializer: Any = None,
+ **kwargs,
+ ):
"""Initializes a Queue object.
Args:
@@ -109,7 +140,7 @@ class Queue:
If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True.
job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
- """
+ """
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
self.name = name
@@ -145,7 +176,7 @@ class Queue:
def get_redis_server_version(self) -> Tuple[int, int, int]:
"""Return Redis server version of connection
-
+
Returns:
redis_version (Tuple): A tuple with the parsed Redis version (eg: (5,0,0))
"""
@@ -184,7 +215,7 @@ class Queue:
Returns:
script (...): The Lua Script is called.
- """
+ """
script = """
local prefix = "{0}"
local q = KEYS[1]
@@ -201,13 +232,17 @@ class Queue:
count = count + 1
end
return count
- """.format(self.job_class.redis_job_namespace_prefix).encode("utf-8")
+ """.format(
+ self.job_class.redis_job_namespace_prefix
+ ).encode(
+ "utf-8"
+ )
script = self.connection.register_script(script)
return script(keys=[self.key])
def delete(self, delete_jobs: bool = True):
"""Deletes the queue.
-
+
Args:
delete_jobs (bool): If true, removes all the associated messages on the queue first.
"""
@@ -221,7 +256,7 @@ class Queue:
def is_empty(self) -> bool:
"""Returns whether the current queue is empty.
-
+
Returns:
is_empty (bool): Whether the queue is empty
"""
@@ -242,7 +277,7 @@ class Queue:
Returns:
job (Optional[Job]): The job if found
- """
+ """
try:
job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError:
@@ -251,7 +286,7 @@ class Queue:
if job.origin == self.name:
return job
- def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]:
+ def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]:
"""Returns the position of a job within the queue
Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of
@@ -293,11 +328,7 @@ class Queue:
end = offset + (length - 1)
else:
end = length
- job_ids = [
- as_text(job_id)
- for job_id
- in self.connection.lrange(self.key, start, end)
- ]
+ job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)]
self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
return job_ids
@@ -333,18 +364,21 @@ class Queue:
def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry
+
return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property
def started_job_registry(self):
"""Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry
+
return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property
def finished_job_registry(self):
"""Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry
+
# TODO: Why was job_class only ommited here before? Was it intentional?
return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@@ -352,18 +386,21 @@ class Queue:
def deferred_job_registry(self):
"""Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry
+
return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property
def scheduled_job_registry(self):
"""Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry
+
return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property
def canceled_job_registry(self):
"""Returns this queue's CanceledJobRegistry."""
from rq.registry import CanceledJobRegistry
+
return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None):
@@ -387,8 +424,7 @@ class Queue:
"""Removes all "dead" jobs from the queue by cycling through it,
while guaranteeing FIFO semantics.
"""
- COMPACT_QUEUE = '{0}_compact:{1}'.format(
- self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa
+ COMPACT_QUEUE = '{0}_compact:{1}'.format(self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa
self.connection.rename(self.key, COMPACT_QUEUE)
while True:
@@ -414,12 +450,25 @@ class Queue:
result = connection.rpush(self.key, job_id)
self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
- def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None,
- job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED,
- retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None,
- on_failure: Optional[Callable] = None) -> Job:
+ def create_job(
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ meta: Optional[Dict] = None,
+ status: JobStatus = JobStatus.QUEUED,
+ retry: Optional['Retry'] = None,
+ *,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
+ ) -> Job:
"""Creates a job based on parameters given
Args:
@@ -461,12 +510,23 @@ class Queue:
raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create(
- func, args=args, kwargs=kwargs, connection=self.connection,
- result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
- status=status, description=description,
- depends_on=depends_on, timeout=timeout, id=job_id,
- origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success,
- on_failure=on_failure
+ func,
+ args=args,
+ kwargs=kwargs,
+ connection=self.connection,
+ result_ttl=result_ttl,
+ ttl=ttl,
+ failure_ttl=failure_ttl,
+ status=status,
+ description=description,
+ depends_on=depends_on,
+ timeout=timeout,
+ id=job_id,
+ origin=self.name,
+ meta=meta,
+ serializer=self.serializer,
+ on_success=on_success,
+ on_failure=on_failure,
)
if retry:
@@ -501,10 +561,7 @@ class Queue:
# is called from within this method.
pipe.watch(job.dependencies_key)
- dependencies = job.fetch_dependencies(
- watch=True,
- pipeline=pipe
- )
+ dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)
pipe.multi()
@@ -535,12 +592,25 @@ class Queue:
pipeline.multi() # Ensure pipeline in multi mode before returning to caller
return job
- def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None,
- job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None,
- retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None,
- on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job:
+ def enqueue_call(
+ self,
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ depends_on: Optional['JobDependencyType'] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable[..., Any]] = None,
+ on_failure: Optional[Callable[..., Any]] = None,
+ pipeline: Optional['Pipeline'] = None,
+ ) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
It is much like `.enqueue()`, except that it takes the function's args
@@ -567,30 +637,49 @@ class Queue:
Returns:
Job: The enqueued Job
- """
+ """
job = self.create_job(
- func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
- failure_ttl=failure_ttl, description=description, depends_on=depends_on,
- job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
- retry=retry, on_success=on_success, on_failure=on_failure
+ func,
+ args=args,
+ kwargs=kwargs,
+ result_ttl=result_ttl,
+ ttl=ttl,
+ failure_ttl=failure_ttl,
+ description=description,
+ depends_on=depends_on,
+ job_id=job_id,
+ meta=meta,
+ status=JobStatus.QUEUED,
+ timeout=timeout,
+ retry=retry,
+ on_success=on_success,
+ on_failure=on_failure,
)
- job = self.setup_dependencies(
- job,
- pipeline=pipeline
- )
+ job = self.setup_dependencies(job, pipeline=pipeline)
# If we do not depend on an unfinished job, enqueue the job.
if job.get_status(refresh=False) != JobStatus.DEFERRED:
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
@staticmethod
- def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
- timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
- failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None,
- at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None,
- on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData:
+ def prepare_data(
+ func: 'FunctionReferenceType',
+ args: Union[Tuple, List, None] = None,
+ kwargs: Optional[Dict] = None,
+ timeout: Optional[int] = None,
+ result_ttl: Optional[int] = None,
+ ttl: Optional[int] = None,
+ failure_ttl: Optional[int] = None,
+ description: Optional[str] = None,
+ job_id: Optional[str] = None,
+ at_front: bool = False,
+ meta: Optional[Dict] = None,
+ retry: Optional['Retry'] = None,
+ on_success: Optional[Callable] = None,
+ on_failure: Optional[Callable] = None,
+ ) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
@@ -614,10 +703,20 @@ class Queue:
EnqueueData: The EnqueueData
"""
return EnqueueData(
- func, args, kwargs, timeout,
- result_ttl, ttl, failure_ttl,
- description, job_id,
- at_front, meta, retry, on_success, on_failure
+ func,
+ args,
+ kwargs,
+ timeout,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ description,
+ job_id,
+ at_front,
+ meta,
+ retry,
+ on_success,
+ on_failure,
)
def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]:
@@ -635,18 +734,24 @@ class Queue:
jobs = [
self.enqueue_job(
self.create_job(
- job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl,
+ job_data.func,
+ args=job_data.args,
+ kwargs=job_data.kwargs,
+ result_ttl=job_data.result_ttl,
ttl=job_data.ttl,
- failure_ttl=job_data.failure_ttl, description=job_data.description,
+ failure_ttl=job_data.failure_ttl,
+ description=job_data.description,
depends_on=None,
- job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED,
+ job_id=job_data.job_id,
+ meta=job_data.meta,
+ status=JobStatus.QUEUED,
timeout=job_data.timeout,
retry=job_data.retry,
on_success=job_data.on_success,
- on_failure=job_data.on_failure
+ on_failure=job_data.on_failure,
),
pipeline=pipe,
- at_front=job_data.at_front
+ at_front=job_data.at_front,
)
for job_data in job_datas
]
@@ -662,7 +767,7 @@ class Queue:
Returns:
Job: _description_
- """
+ """
job.perform()
job.set_status(JobStatus.FINISHED)
job.save(include_meta=False)
@@ -687,8 +792,7 @@ class Queue:
kwargs (*kwargs): function kargs
"""
if not isinstance(f, str) and f.__module__ == '__main__':
- raise ValueError('Functions from the __main__ module cannot be processed '
- 'by workers')
+ raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers')
# Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30)
@@ -711,9 +815,24 @@ class Queue:
args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)
- return (f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, retry, on_success, on_failure,
- pipeline, args, kwargs)
+ return (
+ f,
+ timeout,
+ description,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ depends_on,
+ job_id,
+ at_front,
+ meta,
+ retry,
+ on_success,
+ on_failure,
+ pipeline,
+ args,
+ kwargs,
+ )
def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job':
"""Creates a job to represent the delayed function call and enqueues it.
@@ -727,16 +846,42 @@ class Queue:
Returns:
job (Job): The created Job
"""
- (f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, retry, on_success,
- on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+ (
+ f,
+ timeout,
+ description,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ depends_on,
+ job_id,
+ at_front,
+ meta,
+ retry,
+ on_success,
+ on_failure,
+ pipeline,
+ args,
+ kwargs,
+ ) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call(
- func=f, args=args, kwargs=kwargs, timeout=timeout,
- result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
- description=description, depends_on=depends_on, job_id=job_id,
- at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure,
- pipeline=pipeline
+ func=f,
+ args=args,
+ kwargs=kwargs,
+ timeout=timeout,
+ result_ttl=result_ttl,
+ ttl=ttl,
+ failure_ttl=failure_ttl,
+ description=description,
+ depends_on=depends_on,
+ job_id=job_id,
+ at_front=at_front,
+ meta=meta,
+ retry=retry,
+ on_success=on_success,
+ on_failure=on_failure,
+ pipeline=pipeline,
)
def enqueue_at(self, datetime: datetime, f, *args, **kwargs):
@@ -749,14 +894,41 @@ class Queue:
Returns:
_type_: _description_
"""
- (f, timeout, description, result_ttl, ttl, failure_ttl,
- depends_on, job_id, at_front, meta, retry, on_success, on_failure,
- pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
- job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
- timeout=timeout, result_ttl=result_ttl, ttl=ttl,
- failure_ttl=failure_ttl, description=description,
- depends_on=depends_on, job_id=job_id, meta=meta, retry=retry,
- on_success=on_success, on_failure=on_failure)
+ (
+ f,
+ timeout,
+ description,
+ result_ttl,
+ ttl,
+ failure_ttl,
+ depends_on,
+ job_id,
+ at_front,
+ meta,
+ retry,
+ on_success,
+ on_failure,
+ pipeline,
+ args,
+ kwargs,
+ ) = Queue.parse_args(f, *args, **kwargs)
+ job = self.create_job(
+ f,
+ status=JobStatus.SCHEDULED,
+ args=args,
+ kwargs=kwargs,
+ timeout=timeout,
+ result_ttl=result_ttl,
+ ttl=ttl,
+ failure_ttl=failure_ttl,
+ description=description,
+ depends_on=depends_on,
+ job_id=job_id,
+ meta=meta,
+ retry=retry,
+ on_success=on_success,
+ on_failure=on_failure,
+ )
if at_front:
job.enqueue_at_front = True
return self.schedule_job(job, datetime, pipeline=pipeline)
@@ -773,6 +945,7 @@ class Queue:
_type_: _description_
"""
from .registry import ScheduledJobRegistry
+
registry = ScheduledJobRegistry(queue=self)
pipe = pipeline if pipeline is not None else self.connection.pipeline()
@@ -795,8 +968,7 @@ class Queue:
Returns:
job (Job): The enqueued Job
"""
- return self.enqueue_at(datetime.now(timezone.utc) + time_delta,
- func, *args, **kwargs)
+ return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution.
@@ -838,14 +1010,14 @@ class Queue:
return job
def run_sync(self, job: 'Job') -> 'Job':
- """Run a job synchronously, meaning on the same process the method was called.
+ """Run a job synchronously, meaning on the same process the method was called.
Args:
job (Job): The job to run
Returns:
Job: The job instance
- """
+ """
with self.connection.pipeline() as pipeline:
job.prepare_for_execution('sync', pipeline)
@@ -861,7 +1033,9 @@ class Queue:
return job
- def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None):
+ def enqueue_dependents(
+ self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
+ ):
"""Enqueues all jobs in the given job's dependents set and clears it.
When called without a pipeline, this method uses WATCH/MULTI/EXEC.
@@ -886,20 +1060,19 @@ class Queue:
if pipeline is None:
pipe.watch(dependents_key)
- dependent_job_ids = {as_text(_id)
- for _id in pipe.smembers(dependents_key)}
+ dependent_job_ids = {as_text(_id) for _id in pipe.smembers(dependents_key)}
# There's no dependents
if not dependent_job_ids:
break
jobs_to_enqueue = [
- dependent_job for dependent_job
- in self.job_class.fetch_many(
- dependent_job_ids,
- connection=self.connection,
- serializer=self.serializer
- ) if dependent_job and dependent_job.dependencies_are_met(
+ dependent_job
+ for dependent_job in self.job_class.fetch_many(
+ dependent_job_ids, connection=self.connection, serializer=self.serializer
+ )
+ if dependent_job
+ and dependent_job.dependencies_are_met(
parent_job=job,
pipeline=pipe,
exclude_job_id=exclude_job_id,
@@ -914,10 +1087,9 @@ class Queue:
for dependent in jobs_to_enqueue:
enqueue_at_front = dependent.enqueue_at_front or False
- registry = DeferredJobRegistry(dependent.origin,
- self.connection,
- job_class=self.job_class,
- serializer=self.serializer)
+ registry = DeferredJobRegistry(
+ dependent.origin, self.connection, job_class=self.job_class, serializer=self.serializer
+ )
registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name:
@@ -1000,8 +1172,14 @@ class Queue:
return None
@classmethod
- def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None,
- job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']:
+ def dequeue_any(
+ cls,
+ queues: List['Queue'],
+ timeout: int,
+ connection: Optional['Redis'] = None,
+ job_class: Optional['Job'] = None,
+ serializer: Any = None,
+ ) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -1033,10 +1211,7 @@ class Queue:
if result is None:
return None
queue_key, job_id = map(as_text, result)
- queue = cls.from_queue_key(queue_key,
- connection=connection,
- job_class=job_class,
- serializer=serializer)
+ queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer)
try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError:
diff --git a/rq/registry.py b/rq/registry.py
index f581776..509bd87 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -23,11 +23,18 @@ class BaseRegistry:
Each job is stored as a key in the registry, scored by expiration time
(unix timestamp).
"""
+
job_class = Job
key_template = 'rq:registry:{0}'
- def __init__(self, name: str = 'default', connection: Optional['Redis'] = None,
- job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None):
+ def __init__(
+ self,
+ name: str = 'default',
+ connection: Optional['Redis'] = None,
+ job_class: Optional[Type['Job']] = None,
+ queue: Optional['Queue'] = None,
+ serializer: Any = None,
+ ):
if queue:
self.name = queue.name
self.connection = resolve_connection(queue.connection)
@@ -46,8 +53,8 @@ class BaseRegistry:
def __eq__(self, other):
return (
- self.name == other.name and
- self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
+ self.name == other.name
+ and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
)
def __contains__(self, item: Union[str, 'Job']):
@@ -134,8 +141,7 @@ class BaseRegistry:
_type_: _description_
"""
self.cleanup()
- return [as_text(job_id) for job_id in
- self.connection.zrange(self.key, start, end)]
+ return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)]
def get_queue(self):
"""Returns Queue object associated with this registry."""
@@ -175,8 +181,7 @@ class BaseRegistry:
raise InvalidJobOperation
with self.connection.pipeline() as pipeline:
- queue = Queue(job.origin, connection=self.connection,
- job_class=self.job_class, serializer=serializer)
+ queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer)
job.started_at = None
job.ended_at = None
job._exc_info = ''
@@ -195,6 +200,7 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
"""
+
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp: Optional[float] = None):
@@ -216,9 +222,7 @@ class StartedJobRegistry(BaseRegistry):
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
try:
- job = self.job_class.fetch(job_id,
- connection=self.connection,
- serializer=self.serializer)
+ job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError:
continue
@@ -246,6 +250,7 @@ class FinishedJobRegistry(BaseRegistry):
Registry of jobs that have been completed. Jobs are added to this
registry after they have successfully completed for monitoring purposes.
"""
+
key_template = 'rq:finished:{0}'
def cleanup(self, timestamp: Optional[float] = None):
@@ -263,6 +268,7 @@ class FailedJobRegistry(BaseRegistry):
"""
Registry of containing failed jobs.
"""
+
key_template = 'rq:failed:{0}'
def cleanup(self, timestamp: Optional[float] = None):
@@ -275,8 +281,14 @@ class FailedJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
- def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None,
- _save_exc_to_job: bool = False):
+ def add(
+ self,
+ job: 'Job',
+ ttl=None,
+ exc_string: str = '',
+ pipeline: Optional['Pipeline'] = None,
+ _save_exc_to_job: bool = False,
+ ):
"""
Adds a job to a registry with expiry time of now + ttl.
`ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
@@ -303,6 +315,7 @@ class DeferredJobRegistry(BaseRegistry):
"""
Registry of deferred jobs (waiting for another job to finish).
"""
+
key_template = 'rq:deferred:{0}'
def cleanup(self):
@@ -316,6 +329,7 @@ class ScheduledJobRegistry(BaseRegistry):
"""
Registry of scheduled jobs.
"""
+
key_template = 'rq:scheduled:{0}'
def __init__(self, *args, **kwargs):
@@ -396,7 +410,7 @@ class ScheduledJobRegistry(BaseRegistry):
class CanceledJobRegistry(BaseRegistry):
key_template = 'rq:canceled:{0}'
- def get_expired_job_ids(self, timestamp: Optional[datetime] = None):
+ def get_expired_job_ids(self, timestamp: Optional[datetime] = None):
raise NotImplementedError
def cleanup(self):
@@ -412,19 +426,16 @@ def clean_registries(queue: 'Queue'):
Args:
queue (Queue): The queue to clean
"""
- registry = FinishedJobRegistry(name=queue.name,
- connection=queue.connection,
- job_class=queue.job_class,
- serializer=queue.serializer)
+ registry = FinishedJobRegistry(
+ name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
+ )
registry.cleanup()
- registry = StartedJobRegistry(name=queue.name,
- connection=queue.connection,
- job_class=queue.job_class,
- serializer=queue.serializer)
+ registry = StartedJobRegistry(
+ name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
+ )
registry.cleanup()
- registry = FailedJobRegistry(name=queue.name,
- connection=queue.connection,
- job_class=queue.job_class,
- serializer=queue.serializer)
+ registry = FailedJobRegistry(
+ name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
+ )
registry.cleanup()
diff --git a/rq/results.py b/rq/results.py
index 0df131a..a6dafde 100644
--- a/rq/results.py
+++ b/rq/results.py
@@ -17,15 +17,22 @@ def get_key(job_id):
class Result(object):
-
class Type(Enum):
SUCCESSFUL = 1
FAILED = 2
STOPPED = 3
- def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None,
- created_at: Optional[datetime] = None, return_value: Optional[Any] = None,
- exc_string: Optional[str] = None, serializer=None):
+ def __init__(
+ self,
+ job_id: str,
+ type: Type,
+ connection: Redis,
+ id: Optional[str] = None,
+ created_at: Optional[datetime] = None,
+ return_value: Optional[Any] = None,
+ exc_string: Optional[str] = None,
+ serializer=None,
+ ):
self.return_value = return_value
self.exc_string = exc_string
self.type = type
@@ -49,16 +56,26 @@ class Result(object):
@classmethod
def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None):
- result = cls(job_id=job.id, type=type, connection=job.connection,
- return_value=return_value,
- exc_string=exc_string, serializer=job.serializer)
+ result = cls(
+ job_id=job.id,
+ type=type,
+ connection=job.connection,
+ return_value=return_value,
+ exc_string=exc_string,
+ serializer=job.serializer,
+ )
result.save(ttl=ttl, pipeline=pipeline)
return result
@classmethod
def create_failure(cls, job, ttl, exc_string, pipeline=None):
- result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection,
- exc_string=exc_string, serializer=job.serializer)
+ result = cls(
+ job_id=job.id,
+ type=cls.Type.FAILED,
+ connection=job.connection,
+ exc_string=exc_string,
+ serializer=job.serializer,
+ )
result.save(ttl=ttl, pipeline=pipeline)
return result
@@ -70,8 +87,7 @@ class Result(object):
results = []
for (result_id, payload) in response:
results.append(
- cls.restore(job.id, result_id.decode(), payload,
- connection=job.connection, serializer=serializer)
+ cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
)
return results
@@ -89,9 +105,7 @@ class Result(object):
@classmethod
def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result':
"""Create a Result object from given Redis payload"""
- created_at = datetime.fromtimestamp(
- int(result_id.split('-')[0]) / 1000, tz=timezone.utc
- )
+ created_at = datetime.fromtimestamp(int(result_id.split('-')[0]) / 1000, tz=timezone.utc)
payload = decode_redis_hash(payload)
# data, timestamp = payload
# result_data = json.loads(data)
@@ -106,11 +120,15 @@ class Result(object):
if exc_string:
exc_string = zlib.decompress(b64decode(exc_string)).decode()
- return Result(job_id, Result.Type(int(payload['type'])), connection=connection,
- id=result_id,
- created_at=created_at,
- return_value=return_value,
- exc_string=exc_string)
+ return Result(
+ job_id,
+ Result.Type(int(payload['type'])),
+ connection=connection,
+ id=result_id,
+ created_at=created_at,
+ return_value=return_value,
+ exc_string=exc_string,
+ )
@classmethod
def fetch(cls, job: Job, serializer=None) -> Optional['Result']:
@@ -134,8 +152,7 @@ class Result(object):
return None
result_id, payload = response[0]
- return cls.restore(job.id, result_id.decode(), payload,
- connection=job.connection, serializer=serializer)
+ return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
@classmethod
def get_key(cls, job_id):
diff --git a/rq/scheduler.py b/rq/scheduler.py
index de8f26e..da59b0d 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -9,8 +9,7 @@ from multiprocessing import Process
from redis import SSLConnection, UnixDomainSocketConnection
-from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT,
- DEFAULT_SCHEDULER_FALLBACK_PERIOD)
+from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD
from .job import Job
from .logutils import setup_loghandlers
from .queue import Queue
@@ -35,9 +34,16 @@ class RQScheduler:
Status = SchedulerStatus
- def __init__(self, queues, connection, interval=1, logging_level=logging.INFO,
- date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, serializer=None):
+ def __init__(
+ self,
+ queues,
+ connection,
+ interval=1,
+ logging_level=logging.INFO,
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ serializer=None,
+ ):
self._queue_names = set(parse_names(queues))
self._acquired_locks = set()
self._scheduled_job_registries = []
@@ -59,9 +65,7 @@ class RQScheduler:
# the key is necessary.
# `path` is not left in the dictionary as that keyword argument is
# not expected by `redis.client.Redis` and would raise an exception.
- self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop(
- 'path'
- )
+ self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path')
self.serializer = resolve_serializer(serializer)
self._connection = None
@@ -158,9 +162,7 @@ class RQScheduler:
queue = Queue(registry.name, connection=self.connection, serializer=self.serializer)
with self.connection.pipeline() as pipeline:
- jobs = Job.fetch_many(
- job_ids, connection=self.connection, serializer=self.serializer
- )
+ jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
for job in jobs:
if job is not None:
queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
@@ -181,8 +183,7 @@ class RQScheduler:
def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks"""
- self.log.debug("Scheduler sending heartbeat to %s",
- ", ".join(self.acquired_locks))
+ self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline:
for name in self._acquired_locks:
@@ -194,8 +195,7 @@ class RQScheduler:
self.connection.expire(key, self.interval + 60)
def stop(self):
- self.log.info("Scheduler stopping, releasing locks for %s...",
- ','.join(self._queue_names))
+ self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names))
self.release_locks()
self._status = self.Status.STOPPED
@@ -231,15 +231,11 @@ class RQScheduler:
def run(scheduler):
- scheduler.log.info("Scheduler for %s started with PID %s",
- ','.join(scheduler._queue_names), os.getpid())
+ scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid())
try:
scheduler.work()
except: # noqa
- scheduler.log.error(
- 'Scheduler [PID %s] raised an exception.\n%s',
- os.getpid(), traceback.format_exc()
- )
+ scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
raise
scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())
diff --git a/rq/serializers.py b/rq/serializers.py
index 9e63bc7..b9b7d9c 100644
--- a/rq/serializers.py
+++ b/rq/serializers.py
@@ -11,7 +11,7 @@ class DefaultSerializer:
loads = pickle.loads
-class JSONSerializer():
+class JSONSerializer:
@staticmethod
def dumps(*args, **kwargs):
return json.dumps(*args, **kwargs).encode('utf-8')
@@ -29,7 +29,7 @@ def resolve_serializer(serializer=None):
Args:
serializer (Callable): The serializer to resolve.
-
+
Returns:
serializer (Callable): An object that implements the SerializerProtocol
"""
diff --git a/rq/timeouts.py b/rq/timeouts.py
index c9f1e44..a1401c5 100644
--- a/rq/timeouts.py
+++ b/rq/timeouts.py
@@ -5,6 +5,7 @@ import threading
class BaseTimeoutException(Exception):
"""Base exception for timeouts."""
+
pass
@@ -12,6 +13,7 @@ class JobTimeoutException(BaseTimeoutException):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
+
pass
@@ -19,6 +21,7 @@ class HorseMonitorTimeoutException(BaseTimeoutException):
"""Raised when waiting for a horse exiting takes longer than the maximum
timeout value.
"""
+
pass
@@ -56,10 +59,8 @@ class BaseDeathPenalty:
class UnixSignalDeathPenalty(BaseDeathPenalty):
-
def handle_death_penalty(self, signum, frame):
- raise self._exception('Task exceeded maximum timeout value '
- '({0} seconds)'.format(self._timeout))
+ raise self._exception('Task exceeded maximum timeout value ' '({0} seconds)'.format(self._timeout))
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
@@ -85,15 +86,12 @@ class TimerDeathPenalty(BaseDeathPenalty):
# Monkey-patch exception with the message ahead of time
# since PyThreadState_SetAsyncExc can only take a class
def init_with_message(self, *args, **kwargs): # noqa
- super(exception, self).__init__(
- "Task exceeded maximum timeout value ({0} seconds)".format(timeout)
- )
+ super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout))
self._exception.__init__ = init_with_message
def new_timer(self):
- """Returns a new timer since timers can only be used once.
- """
+ """Returns a new timer since timers can only be used once."""
return threading.Timer(self._timeout, self.handle_death_penalty)
def handle_death_penalty(self):
@@ -111,13 +109,11 @@ class TimerDeathPenalty(BaseDeathPenalty):
raise SystemError("PyThreadState_SetAsyncExc failed")
def setup_death_penalty(self):
- """Starts the timer.
- """
+ """Starts the timer."""
self._timer = self.new_timer()
self._timer.start()
def cancel_death_penalty(self):
- """Cancels the timer.
- """
+ """Cancels the timer."""
self._timer.cancel()
self._timer = None
diff --git a/rq/utils.py b/rq/utils.py
index 8993487..1c3fa01 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -41,10 +41,8 @@ class _Colorizer:
self.codes["blink"] = esc + "05m"
self.codes["overline"] = esc + "06m"
- dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue",
- "purple", "teal", "lightgray"]
- light_colors = ["darkgray", "red", "green", "yellow", "blue",
- "fuchsia", "turquoise", "white"]
+ dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"]
+ light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"]
x = 30
for d, l in zip(dark_colors, light_colors):
@@ -90,8 +88,10 @@ def make_colorizer(color: str):
>>> # You can then use:
>>> print("It's either " + green('OK') + ' or ' + red('Oops'))
"""
+
def inner(text):
return colorizer.colorize(color, text)
+
return inner
@@ -134,7 +134,7 @@ def compact(lst: List[Any]) -> List[Any]:
Returns:
object (list): The list without None values
- """
+ """
return [item for item in lst if item is not None]
@@ -149,7 +149,7 @@ def as_text(v: Union[bytes, str]) -> Optional[str]:
Returns:
value (Optional[str]): Either the decoded string or None
- """
+ """
if v is None:
return None
elif isinstance(v, bytes):
@@ -169,7 +169,7 @@ def decode_redis_hash(h) -> Dict[str, Any]:
Returns:
Dict[str, Any]: The decoded Redis data (Dictionary)
- """
+ """
return dict((as_text(k), h[k]) for k in h)
@@ -230,8 +230,7 @@ def utcnow():
def now():
- """Return now in UTC
- """
+ """Return now in UTC"""
return datetime.datetime.now(datetime.timezone.utc)
@@ -356,8 +355,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]:
def parse_timeout(timeout: Any):
- """Transfer all kinds of timeout format to an integer representing seconds
- """
+ """Transfer all kinds of timeout format to an integer representing seconds"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:
try:
timeout = int(timeout)
@@ -367,9 +365,11 @@ def parse_timeout(timeout: Any):
try:
timeout = int(digit) * unit_second[unit]
except (ValueError, KeyError):
- raise TimeoutFormatError('Timeout must be an integer or a string representing an integer, or '
- 'a string with format: digits + unit, unit can be "d", "h", "m", "s", '
- 'such as "1h", "23m".')
+ raise TimeoutFormatError(
+ 'Timeout must be an integer or a string representing an integer, or '
+ 'a string with format: digits + unit, unit can be "d", "h", "m", "s", '
+ 'such as "1h", "23m".'
+ )
return timeout
@@ -381,7 +381,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]:
Args:
connection (Redis): The Redis connection.
-
+
Returns:
version (Tuple[int, int, int]): A tuple representing the semantic versioning format (eg. (5, 0, 9))
"""
@@ -391,7 +391,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]:
setattr(
connection,
"__rq_redis_server_version",
- tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3])
+ tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]),
)
return getattr(connection, "__rq_redis_server_version")
except ResponseError: # fakeredis doesn't implement Redis' INFO command
@@ -422,7 +422,7 @@ def split_list(a_list: List[Any], segment_size: int):
list: The splitted listed
"""
for i in range(0, len(a_list), segment_size):
- yield a_list[i:i + segment_size]
+ yield a_list[i : i + segment_size]
def truncate_long_string(data: str, max_length: Optional[int] = None) -> str:
@@ -431,7 +431,7 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str:
Args:
data (str): The data to truncate
max_length (Optional[int], optional): The max length. Defaults to None.
-
+
Returns:
truncated (str): The truncated string
"""
@@ -440,8 +440,9 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str:
return (data[:max_length] + '...') if len(data) > max_length else data
-def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any],
- max_length: Optional[int] = None) -> Optional[str]:
+def get_call_string(
+ func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
+) -> Optional[str]:
"""
Returns a string representation of the call, formatted as a regular
Python function invocation statement. If max_length is not None, truncate
diff --git a/rq/worker.py b/rq/worker.py
index 35355c7..06a707e 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection
-from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL,
- DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
- DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
+from .defaults import (
+ CALLBACK_TIMEOUT,
+ DEFAULT_MAINTENANCE_TASK_INTERVAL,
+ DEFAULT_RESULT_TTL,
+ DEFAULT_WORKER_TTL,
+ DEFAULT_JOB_MONITORING_INTERVAL,
+ DEFAULT_LOGGING_FORMAT,
+ DEFAULT_LOGGING_DATE_FORMAT,
+)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
@@ -46,8 +52,7 @@ from .results import Result
from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
-from .utils import (backend_class, ensure_list, get_version,
- make_colorizer, utcformat, utcnow, utcparse, compact)
+from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact
from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer
@@ -55,9 +60,11 @@ from .serializers import resolve_serializer
try:
from setproctitle import setproctitle as setprocname
except ImportError:
+
def setprocname(*args, **kwargs): # noqa
pass
+
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
@@ -69,10 +76,9 @@ class StopRequested(Exception):
pass
-
-_signames = dict((getattr(signal, signame), signame)
- for signame in dir(signal)
- if signame.startswith('SIG') and '_' not in signame)
+_signames = dict(
+ (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame
+)
def signal_name(signum):
@@ -118,7 +124,7 @@ class Worker:
job_class: Optional[Type['Job']] = None,
queue_class: Optional[Type['Queue']] = None,
queue: Optional['Queue'] = None,
- serializer=None
+ serializer=None,
) -> List['Worker']:
"""Returns an iterable of all Workers.
@@ -131,11 +137,12 @@ class Worker:
connection = get_current_connection()
worker_keys = get_keys(queue=queue, connection=connection)
- workers = [cls.find_by_key(as_text(key),
- connection=connection,
- job_class=job_class,
- queue_class=queue_class, serializer=serializer)
- for key in worker_keys]
+ workers = [
+ cls.find_by_key(
+ as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
+ )
+ for key in worker_keys
+ ]
return compact(workers)
@classmethod
@@ -148,9 +155,8 @@ class Worker:
Returns:
list_keys (List[str]): A list of worker keys
- """
- return [as_text(key)
- for key in get_keys(queue=queue, connection=connection)]
+ """
+ return [as_text(key) for key in get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None):
@@ -166,8 +172,14 @@ class Worker:
return len(get_keys(queue=queue, connection=connection))
@classmethod
- def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None,
- queue_class: Type['Queue'] = None, serializer=None):
+ def find_by_key(
+ cls,
+ worker_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Type['Job'] = None,
+ queue_class: Type['Queue'] = None,
+ serializer=None,
+ ):
"""Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys.
@@ -182,22 +194,39 @@ class Worker:
connection.srem(cls.redis_workers_keys, worker_key)
return None
- name = worker_key[len(prefix):]
- worker = cls([], name, connection=connection, job_class=job_class,
- queue_class=queue_class, prepare_for_work=False, serializer=serializer)
+ name = worker_key[len(prefix) :]
+ worker = cls(
+ [],
+ name,
+ connection=connection,
+ job_class=job_class,
+ queue_class=queue_class,
+ prepare_for_work=False,
+ serializer=serializer,
+ )
worker.refresh()
return worker
- def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL,
- connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None,
- default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None,
- queue_class=None, log_job_description: bool = True,
- job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
- disable_default_exception_handler: bool = False,
- prepare_for_work: bool = True, serializer=None): # noqa
-
+ def __init__(
+ self,
+ queues,
+ name: Optional[str] = None,
+ default_result_ttl=DEFAULT_RESULT_TTL,
+ connection: Optional['Redis'] = None,
+ exc_handler=None,
+ exception_handlers=None,
+ default_worker_ttl=DEFAULT_WORKER_TTL,
+ job_class: Type['Job'] = None,
+ queue_class=None,
+ log_job_description: bool = True,
+ job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
+ disable_default_exception_handler: bool = False,
+ prepare_for_work: bool = True,
+ serializer=None,
+ ): # noqa
+
connection = self._set_connection(connection, default_worker_ttl)
self.connection = connection
self.redis_server_version = None
@@ -208,11 +237,12 @@ class Worker:
self.python_version = sys.version
self.serializer = resolve_serializer(serializer)
- queues = [self.queue_class(name=q,
- connection=connection,
- job_class=self.job_class, serializer=self.serializer)
- if isinstance(q, str) else q
- for q in ensure_list(queues)]
+ queues = [
+ self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer)
+ if isinstance(q, str)
+ else q
+ for q in ensure_list(queues)
+ ]
self.name: str = name or uuid4().hex
self.queues = queues
@@ -250,24 +280,14 @@ class Worker:
try:
connection.client_setname(self.name)
except redis.exceptions.ResponseError:
- warnings.warn(
- 'CLIENT SETNAME command not supported, setting ip_address to unknown',
- Warning
- )
+ warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning)
self.ip_address = 'unknown'
else:
- client_adresses = [
- client['addr']
- for client in connection.client_list()
- if client['name'] == self.name
- ]
+ client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name]
if len(client_adresses) > 0:
self.ip_address = client_adresses[0]
else:
- warnings.warn(
- 'CLIENT LIST command not supported, setting ip_address to unknown',
- Warning
- )
+ warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning)
self.ip_address = 'unknown'
else:
self.hostname = None
@@ -361,8 +381,7 @@ class Worker:
def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s', self.name)
- if self.connection.exists(self.key) and \
- not self.connection.hexists(self.key, 'death'):
+ if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already'
raise ValueError(msg.format(self.name))
key = self.key
@@ -436,10 +455,7 @@ class Worker:
def _set_state(self, state):
"""Raise a DeprecationWarning if ``worker.state = X`` is used"""
- warnings.warn(
- "worker.state is deprecated, use worker.set_state() instead.",
- DeprecationWarning
- )
+ warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning)
self.set_state(state)
def get_state(self):
@@ -447,10 +463,7 @@ class Worker:
def _get_state(self):
"""Raise a DeprecationWarning if ``worker.state == X`` is used"""
- warnings.warn(
- "worker.state is deprecated, use worker.get_state() instead.",
- DeprecationWarning
- )
+ warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning)
return self.get_state()
state = property(_get_state, _set_state)
@@ -516,8 +529,7 @@ class Worker:
return pid, stat
def request_force_stop(self, signum, frame):
- """Terminates the application (cold shutdown).
- """
+ """Terminates the application (cold shutdown)."""
self.log.warning('Cold shut down')
# Take down the horse with the worker
@@ -547,8 +559,7 @@ class Worker:
if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
- self.log.debug('Stopping after current horse is finished. '
- 'Press Ctrl+C again for a cold shutdown.')
+ self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.')
if self.scheduler:
self.stop_scheduler()
else:
@@ -614,8 +625,15 @@ class Worker:
def reorder_queues(self, reference_queue):
pass
- def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False):
+ def work(
+ self,
+ burst: bool = False,
+ logging_level: str = "INFO",
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ max_jobs=None,
+ with_scheduler: bool = False,
+ ):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@@ -635,8 +653,13 @@ class Worker:
if with_scheduler:
self.scheduler = RQScheduler(
- self.queues, connection=self.connection, logging_level=logging_level,
- date_format=date_format, log_format=log_format, serializer=self.serializer)
+ self.queues,
+ connection=self.connection,
+ logging_level=logging_level,
+ date_format=date_format,
+ log_format=log_format,
+ serializer=self.serializer,
+ )
self.scheduler.acquire_locks()
# If lock is acquired, start scheduler
if self.scheduler.acquired_locks:
@@ -676,10 +699,7 @@ class Worker:
completed_jobs += 1
if max_jobs is not None:
if completed_jobs >= max_jobs:
- self.log.info(
- "Worker %s: finished executing %d jobs, quitting",
- self.key, completed_jobs
- )
+ self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs)
break
except redis.exceptions.TimeoutError:
@@ -694,10 +714,7 @@ class Worker:
raise
except: # noqa
- self.log.error(
- 'Worker %s: found an unhandled exception, quitting...',
- self.key, exc_info=True
- )
+ self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
break
finally:
if not self.is_horse:
@@ -736,19 +753,20 @@ class Worker:
self.run_maintenance_tasks()
self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}")
- result = self.queue_class.dequeue_any(self._ordered_queues, timeout,
- connection=self.connection,
- job_class=self.job_class,
- serializer=self.serializer)
+ result = self.queue_class.dequeue_any(
+ self._ordered_queues,
+ timeout,
+ connection=self.connection,
+ job_class=self.job_class,
+ serializer=self.serializer,
+ )
self.log.debug(f"Dequeued job {result[1]} from {result[0]}")
if result is not None:
job, queue = result
job.redis_server_version = self.get_redis_server_version()
if self.log_job_description:
- self.log.info(
- '%s: %s (%s)', green(queue.name),
- blue(job.description), job.id)
+ self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id)
else:
self.log.info('%s: %s', green(queue.name), job.id)
@@ -756,8 +774,9 @@ class Worker:
except DequeueTimeout:
pass
except redis.exceptions.ConnectionError as conn_err:
- self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...',
- conn_err, connection_wait_time)
+ self.log.error(
+ 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time
+ )
time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor
connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
@@ -782,18 +801,44 @@ class Worker:
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
- self.log.debug('Sent heartbeat to prevent worker timeout. '
- 'Next one should arrive within %s seconds.', timeout)
+ self.log.debug(
+ 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout
+ )
def refresh(self):
data = self.connection.hmget(
- self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
- 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time',
- 'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version',
+ self.key,
+ 'queues',
+ 'state',
+ 'current_job',
+ 'last_heartbeat',
+ 'birth',
+ 'failed_job_count',
+ 'successful_job_count',
+ 'total_working_time',
+ 'current_job_working_time',
+ 'hostname',
+ 'ip_address',
+ 'pid',
+ 'version',
+ 'python_version',
)
- (queues, state, job_id, last_heartbeat, birth, failed_job_count,
- successful_job_count, total_working_time, current_job_working_time,
- hostname, ip_address, pid, version, python_version) = data
+ (
+ queues,
+ state,
+ job_id,
+ last_heartbeat,
+ birth,
+ failed_job_count,
+ successful_job_count,
+ total_working_time,
+ current_job_working_time,
+ hostname,
+ ip_address,
+ pid,
+ version,
+ python_version,
+ ) = data
queues = as_text(queues)
self.hostname = as_text(hostname)
self.ip_address = as_text(ip_address)
@@ -820,10 +865,12 @@ class Worker:
self.current_job_working_time = float(as_text(current_job_working_time))
if queues:
- self.queues = [self.queue_class(queue,
- connection=self.connection,
- job_class=self.job_class, serializer=self.serializer)
- for queue in queues.split(',')]
+ self.queues = [
+ self.queue_class(
+ queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer
+ )
+ for queue in queues.split(',')
+ ]
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
@@ -834,12 +881,10 @@ class Worker:
connection.hincrby(self.key, 'successful_job_count', 1)
def increment_total_working_time(self, job_execution_time, pipeline):
- pipeline.hincrbyfloat(self.key, 'total_working_time',
- job_execution_time.total_seconds())
+ pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds())
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
- """Spawns a work horse to perform the actual work and passes it a job.
- """
+ """Spawns a work horse to perform the actual work and passes it a job."""
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
@@ -909,24 +954,20 @@ class Worker:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
- self.handle_job_failure(
- job, queue=queue,
- exc_string="Job stopped by user, work-horse terminated."
- )
+ self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
- self.log.warning((
- 'Moving job to FailedJobRegistry '
- '(work-horse terminated unexpectedly; waitpid returned {})'
- ).format(ret_val))
+ self.log.warning(
+ ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format(
+ ret_val
+ )
+ )
self.handle_job_failure(
- job, queue=queue,
- exc_string="Work-horse was terminated unexpectedly "
- "(waitpid returned %s)" % ret_val
+ job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val
)
def execute_job(self, job: 'Job', queue: 'Queue'):
@@ -1009,8 +1050,7 @@ class Worker:
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
- def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None,
- exc_string=''):
+ def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''):
"""
Handles the failure or an executing job by:
1. Setting the job status to failed
@@ -1023,10 +1063,7 @@ class Worker:
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
- job.origin,
- self.connection,
- job_class=self.job_class,
- serializer=self.serializer
+ job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
)
# check whether a job was stopped intentionally and set the job
@@ -1045,14 +1082,19 @@ class Worker:
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
- failed_job_registry = FailedJobRegistry(job.origin, job.connection,
- job_class=self.job_class, serializer=job.serializer)
+ failed_job_registry = FailedJobRegistry(
+ job.origin, job.connection, job_class=self.job_class, serializer=job.serializer
+ )
# Exception should be saved in job hash if server
# doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams
- failed_job_registry.add(job, ttl=job.failure_ttl,
- exc_string=exc_string, pipeline=pipeline,
- _save_exc_to_job=_save_exc_to_job)
+ failed_job_registry.add(
+ job,
+ ttl=job.failure_ttl,
+ exc_string=exc_string,
+ pipeline=pipeline,
+ _save_exc_to_job=_save_exc_to_job,
+ )
if self.supports_redis_streams:
Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
@@ -1061,9 +1103,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
- self.increment_total_working_time(
- job.ended_at - job.started_at, pipeline
- )
+ self.increment_total_working_time(job.ended_at - job.started_at, pipeline)
if retry:
job.retry(queue, pipeline)
@@ -1099,9 +1139,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
- self.increment_total_working_time(
- job.ended_at - job.started_at, pipeline # type: ignore
- )
+ self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
@@ -1111,16 +1149,15 @@ class Worker:
# doesn't support Redis streams
include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary!
- job.save(pipeline=pipeline, include_meta=False,
- include_result=include_result)
+ job.save(pipeline=pipeline, include_meta=False, include_result=include_result)
if self.supports_redis_streams:
- Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result,
- ttl=result_ttl, pipeline=pipeline)
+ Result.create(
+ job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline
+ )
finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)
- job.cleanup(result_ttl, pipeline=pipeline,
- remove_from_queue=False)
+ job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
self.log.debug('Removing job %s from StartedJobRegistry', job.id)
started_job_registry.remove(job, pipeline=pipeline)
@@ -1172,9 +1209,7 @@ class Worker:
if job.success_callback:
self.execute_success_callback(job, rv)
- self.handle_job_success(job=job,
- queue=queue,
- started_job_registry=started_job_registry)
+ self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
except: # NOQA
self.log.debug(f"Job {job.id} raised an exception.")
job.ended_at = utcnow()
@@ -1185,15 +1220,13 @@ class Worker:
try:
self.execute_failure_callback(job)
except: # noqa
- self.log.error(
- 'Worker %s: error while executing failure callback',
- self.key, exc_info=True
- )
+ self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True)
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
- self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
- started_job_registry=started_job_registry)
+ self.handle_job_failure(
+ job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry
+ )
self.handle_exception(job, *exc_info)
return False
@@ -1237,10 +1270,9 @@ class Worker:
# the properties below should be safe however
extra.update({'queue': job.origin, 'job_id': job.id})
-
+
# func_name
- self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string,
- extra=extra)
+ self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra)
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@@ -1322,6 +1354,7 @@ class HerokuWorker(Worker):
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later
"""
+
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
@@ -1335,10 +1368,7 @@ class HerokuWorker(Worker):
def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0:
- self.log.info(
- 'Worker %s: warm shut down requested, sending horse SIGRTMIN signal',
- self.key
- )
+ self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key)
self.kill_horse(sig=signal.SIGRTMIN)
else:
self.log.warning('Warm shut down requested, no horse found')
@@ -1348,8 +1378,9 @@ class HerokuWorker(Worker):
self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately')
self.request_force_stop_sigrtmin(signum, frame)
else:
- self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds',
- self.imminent_shutdown_delay)
+ self.log.warning(
+ 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay
+ )
signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay)
@@ -1367,7 +1398,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):