summaryrefslogtreecommitdiff
path: root/rq/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'rq/worker.py')
-rw-r--r--rq/worker.py337
1 files changed, 184 insertions, 153 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 35355c7..06a707e 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection
-from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL,
- DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
- DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
+from .defaults import (
+ CALLBACK_TIMEOUT,
+ DEFAULT_MAINTENANCE_TASK_INTERVAL,
+ DEFAULT_RESULT_TTL,
+ DEFAULT_WORKER_TTL,
+ DEFAULT_JOB_MONITORING_INTERVAL,
+ DEFAULT_LOGGING_FORMAT,
+ DEFAULT_LOGGING_DATE_FORMAT,
+)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
@@ -46,8 +52,7 @@ from .results import Result
from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
-from .utils import (backend_class, ensure_list, get_version,
- make_colorizer, utcformat, utcnow, utcparse, compact)
+from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact
from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer
@@ -55,9 +60,11 @@ from .serializers import resolve_serializer
try:
from setproctitle import setproctitle as setprocname
except ImportError:
+
def setprocname(*args, **kwargs): # noqa
pass
+
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
@@ -69,10 +76,9 @@ class StopRequested(Exception):
pass
-
-_signames = dict((getattr(signal, signame), signame)
- for signame in dir(signal)
- if signame.startswith('SIG') and '_' not in signame)
+_signames = dict(
+ (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame
+)
def signal_name(signum):
@@ -118,7 +124,7 @@ class Worker:
job_class: Optional[Type['Job']] = None,
queue_class: Optional[Type['Queue']] = None,
queue: Optional['Queue'] = None,
- serializer=None
+ serializer=None,
) -> List['Worker']:
"""Returns an iterable of all Workers.
@@ -131,11 +137,12 @@ class Worker:
connection = get_current_connection()
worker_keys = get_keys(queue=queue, connection=connection)
- workers = [cls.find_by_key(as_text(key),
- connection=connection,
- job_class=job_class,
- queue_class=queue_class, serializer=serializer)
- for key in worker_keys]
+ workers = [
+ cls.find_by_key(
+ as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
+ )
+ for key in worker_keys
+ ]
return compact(workers)
@classmethod
@@ -148,9 +155,8 @@ class Worker:
Returns:
list_keys (List[str]): A list of worker keys
- """
- return [as_text(key)
- for key in get_keys(queue=queue, connection=connection)]
+ """
+ return [as_text(key) for key in get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None):
@@ -166,8 +172,14 @@ class Worker:
return len(get_keys(queue=queue, connection=connection))
@classmethod
- def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None,
- queue_class: Type['Queue'] = None, serializer=None):
+ def find_by_key(
+ cls,
+ worker_key: str,
+ connection: Optional['Redis'] = None,
+ job_class: Type['Job'] = None,
+ queue_class: Type['Queue'] = None,
+ serializer=None,
+ ):
"""Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys.
@@ -182,22 +194,39 @@ class Worker:
connection.srem(cls.redis_workers_keys, worker_key)
return None
- name = worker_key[len(prefix):]
- worker = cls([], name, connection=connection, job_class=job_class,
- queue_class=queue_class, prepare_for_work=False, serializer=serializer)
+ name = worker_key[len(prefix) :]
+ worker = cls(
+ [],
+ name,
+ connection=connection,
+ job_class=job_class,
+ queue_class=queue_class,
+ prepare_for_work=False,
+ serializer=serializer,
+ )
worker.refresh()
return worker
- def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL,
- connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None,
- default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None,
- queue_class=None, log_job_description: bool = True,
- job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
- disable_default_exception_handler: bool = False,
- prepare_for_work: bool = True, serializer=None): # noqa
-
+ def __init__(
+ self,
+ queues,
+ name: Optional[str] = None,
+ default_result_ttl=DEFAULT_RESULT_TTL,
+ connection: Optional['Redis'] = None,
+ exc_handler=None,
+ exception_handlers=None,
+ default_worker_ttl=DEFAULT_WORKER_TTL,
+ job_class: Type['Job'] = None,
+ queue_class=None,
+ log_job_description: bool = True,
+ job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
+ disable_default_exception_handler: bool = False,
+ prepare_for_work: bool = True,
+ serializer=None,
+ ): # noqa
+
connection = self._set_connection(connection, default_worker_ttl)
self.connection = connection
self.redis_server_version = None
@@ -208,11 +237,12 @@ class Worker:
self.python_version = sys.version
self.serializer = resolve_serializer(serializer)
- queues = [self.queue_class(name=q,
- connection=connection,
- job_class=self.job_class, serializer=self.serializer)
- if isinstance(q, str) else q
- for q in ensure_list(queues)]
+ queues = [
+ self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer)
+ if isinstance(q, str)
+ else q
+ for q in ensure_list(queues)
+ ]
self.name: str = name or uuid4().hex
self.queues = queues
@@ -250,24 +280,14 @@ class Worker:
try:
connection.client_setname(self.name)
except redis.exceptions.ResponseError:
- warnings.warn(
- 'CLIENT SETNAME command not supported, setting ip_address to unknown',
- Warning
- )
+ warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning)
self.ip_address = 'unknown'
else:
- client_adresses = [
- client['addr']
- for client in connection.client_list()
- if client['name'] == self.name
- ]
+ client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name]
if len(client_adresses) > 0:
self.ip_address = client_adresses[0]
else:
- warnings.warn(
- 'CLIENT LIST command not supported, setting ip_address to unknown',
- Warning
- )
+ warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning)
self.ip_address = 'unknown'
else:
self.hostname = None
@@ -361,8 +381,7 @@ class Worker:
def register_birth(self):
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s', self.name)
- if self.connection.exists(self.key) and \
- not self.connection.hexists(self.key, 'death'):
+ if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already'
raise ValueError(msg.format(self.name))
key = self.key
@@ -436,10 +455,7 @@ class Worker:
def _set_state(self, state):
"""Raise a DeprecationWarning if ``worker.state = X`` is used"""
- warnings.warn(
- "worker.state is deprecated, use worker.set_state() instead.",
- DeprecationWarning
- )
+ warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning)
self.set_state(state)
def get_state(self):
@@ -447,10 +463,7 @@ class Worker:
def _get_state(self):
"""Raise a DeprecationWarning if ``worker.state == X`` is used"""
- warnings.warn(
- "worker.state is deprecated, use worker.get_state() instead.",
- DeprecationWarning
- )
+ warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning)
return self.get_state()
state = property(_get_state, _set_state)
@@ -516,8 +529,7 @@ class Worker:
return pid, stat
def request_force_stop(self, signum, frame):
- """Terminates the application (cold shutdown).
- """
+ """Terminates the application (cold shutdown)."""
self.log.warning('Cold shut down')
# Take down the horse with the worker
@@ -547,8 +559,7 @@ class Worker:
if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
- self.log.debug('Stopping after current horse is finished. '
- 'Press Ctrl+C again for a cold shutdown.')
+ self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.')
if self.scheduler:
self.stop_scheduler()
else:
@@ -614,8 +625,15 @@ class Worker:
def reorder_queues(self, reference_queue):
pass
- def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False):
+ def work(
+ self,
+ burst: bool = False,
+ logging_level: str = "INFO",
+ date_format=DEFAULT_LOGGING_DATE_FORMAT,
+ log_format=DEFAULT_LOGGING_FORMAT,
+ max_jobs=None,
+ with_scheduler: bool = False,
+ ):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@@ -635,8 +653,13 @@ class Worker:
if with_scheduler:
self.scheduler = RQScheduler(
- self.queues, connection=self.connection, logging_level=logging_level,
- date_format=date_format, log_format=log_format, serializer=self.serializer)
+ self.queues,
+ connection=self.connection,
+ logging_level=logging_level,
+ date_format=date_format,
+ log_format=log_format,
+ serializer=self.serializer,
+ )
self.scheduler.acquire_locks()
# If lock is acquired, start scheduler
if self.scheduler.acquired_locks:
@@ -676,10 +699,7 @@ class Worker:
completed_jobs += 1
if max_jobs is not None:
if completed_jobs >= max_jobs:
- self.log.info(
- "Worker %s: finished executing %d jobs, quitting",
- self.key, completed_jobs
- )
+ self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs)
break
except redis.exceptions.TimeoutError:
@@ -694,10 +714,7 @@ class Worker:
raise
except: # noqa
- self.log.error(
- 'Worker %s: found an unhandled exception, quitting...',
- self.key, exc_info=True
- )
+ self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
break
finally:
if not self.is_horse:
@@ -736,19 +753,20 @@ class Worker:
self.run_maintenance_tasks()
self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}")
- result = self.queue_class.dequeue_any(self._ordered_queues, timeout,
- connection=self.connection,
- job_class=self.job_class,
- serializer=self.serializer)
+ result = self.queue_class.dequeue_any(
+ self._ordered_queues,
+ timeout,
+ connection=self.connection,
+ job_class=self.job_class,
+ serializer=self.serializer,
+ )
self.log.debug(f"Dequeued job {result[1]} from {result[0]}")
if result is not None:
job, queue = result
job.redis_server_version = self.get_redis_server_version()
if self.log_job_description:
- self.log.info(
- '%s: %s (%s)', green(queue.name),
- blue(job.description), job.id)
+ self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id)
else:
self.log.info('%s: %s', green(queue.name), job.id)
@@ -756,8 +774,9 @@ class Worker:
except DequeueTimeout:
pass
except redis.exceptions.ConnectionError as conn_err:
- self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...',
- conn_err, connection_wait_time)
+ self.log.error(
+ 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time
+ )
time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor
connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
@@ -782,18 +801,44 @@ class Worker:
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
- self.log.debug('Sent heartbeat to prevent worker timeout. '
- 'Next one should arrive within %s seconds.', timeout)
+ self.log.debug(
+ 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout
+ )
def refresh(self):
data = self.connection.hmget(
- self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
- 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time',
- 'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version',
+ self.key,
+ 'queues',
+ 'state',
+ 'current_job',
+ 'last_heartbeat',
+ 'birth',
+ 'failed_job_count',
+ 'successful_job_count',
+ 'total_working_time',
+ 'current_job_working_time',
+ 'hostname',
+ 'ip_address',
+ 'pid',
+ 'version',
+ 'python_version',
)
- (queues, state, job_id, last_heartbeat, birth, failed_job_count,
- successful_job_count, total_working_time, current_job_working_time,
- hostname, ip_address, pid, version, python_version) = data
+ (
+ queues,
+ state,
+ job_id,
+ last_heartbeat,
+ birth,
+ failed_job_count,
+ successful_job_count,
+ total_working_time,
+ current_job_working_time,
+ hostname,
+ ip_address,
+ pid,
+ version,
+ python_version,
+ ) = data
queues = as_text(queues)
self.hostname = as_text(hostname)
self.ip_address = as_text(ip_address)
@@ -820,10 +865,12 @@ class Worker:
self.current_job_working_time = float(as_text(current_job_working_time))
if queues:
- self.queues = [self.queue_class(queue,
- connection=self.connection,
- job_class=self.job_class, serializer=self.serializer)
- for queue in queues.split(',')]
+ self.queues = [
+ self.queue_class(
+ queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer
+ )
+ for queue in queues.split(',')
+ ]
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
@@ -834,12 +881,10 @@ class Worker:
connection.hincrby(self.key, 'successful_job_count', 1)
def increment_total_working_time(self, job_execution_time, pipeline):
- pipeline.hincrbyfloat(self.key, 'total_working_time',
- job_execution_time.total_seconds())
+ pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds())
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
- """Spawns a work horse to perform the actual work and passes it a job.
- """
+ """Spawns a work horse to perform the actual work and passes it a job."""
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
@@ -909,24 +954,20 @@ class Worker:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
- self.handle_job_failure(
- job, queue=queue,
- exc_string="Job stopped by user, work-horse terminated."
- )
+ self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
- self.log.warning((
- 'Moving job to FailedJobRegistry '
- '(work-horse terminated unexpectedly; waitpid returned {})'
- ).format(ret_val))
+ self.log.warning(
+ ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format(
+ ret_val
+ )
+ )
self.handle_job_failure(
- job, queue=queue,
- exc_string="Work-horse was terminated unexpectedly "
- "(waitpid returned %s)" % ret_val
+ job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val
)
def execute_job(self, job: 'Job', queue: 'Queue'):
@@ -1009,8 +1050,7 @@ class Worker:
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
- def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None,
- exc_string=''):
+ def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''):
"""
Handles the failure or an executing job by:
1. Setting the job status to failed
@@ -1023,10 +1063,7 @@ class Worker:
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
- job.origin,
- self.connection,
- job_class=self.job_class,
- serializer=self.serializer
+ job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
)
# check whether a job was stopped intentionally and set the job
@@ -1045,14 +1082,19 @@ class Worker:
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
- failed_job_registry = FailedJobRegistry(job.origin, job.connection,
- job_class=self.job_class, serializer=job.serializer)
+ failed_job_registry = FailedJobRegistry(
+ job.origin, job.connection, job_class=self.job_class, serializer=job.serializer
+ )
# Exception should be saved in job hash if server
# doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams
- failed_job_registry.add(job, ttl=job.failure_ttl,
- exc_string=exc_string, pipeline=pipeline,
- _save_exc_to_job=_save_exc_to_job)
+ failed_job_registry.add(
+ job,
+ ttl=job.failure_ttl,
+ exc_string=exc_string,
+ pipeline=pipeline,
+ _save_exc_to_job=_save_exc_to_job,
+ )
if self.supports_redis_streams:
Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
@@ -1061,9 +1103,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
- self.increment_total_working_time(
- job.ended_at - job.started_at, pipeline
- )
+ self.increment_total_working_time(job.ended_at - job.started_at, pipeline)
if retry:
job.retry(queue, pipeline)
@@ -1099,9 +1139,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline)
- self.increment_total_working_time(
- job.ended_at - job.started_at, pipeline # type: ignore
- )
+ self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
@@ -1111,16 +1149,15 @@ class Worker:
# doesn't support Redis streams
include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary!
- job.save(pipeline=pipeline, include_meta=False,
- include_result=include_result)
+ job.save(pipeline=pipeline, include_meta=False, include_result=include_result)
if self.supports_redis_streams:
- Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result,
- ttl=result_ttl, pipeline=pipeline)
+ Result.create(
+ job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline
+ )
finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)
- job.cleanup(result_ttl, pipeline=pipeline,
- remove_from_queue=False)
+ job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
self.log.debug('Removing job %s from StartedJobRegistry', job.id)
started_job_registry.remove(job, pipeline=pipeline)
@@ -1172,9 +1209,7 @@ class Worker:
if job.success_callback:
self.execute_success_callback(job, rv)
- self.handle_job_success(job=job,
- queue=queue,
- started_job_registry=started_job_registry)
+ self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
except: # NOQA
self.log.debug(f"Job {job.id} raised an exception.")
job.ended_at = utcnow()
@@ -1185,15 +1220,13 @@ class Worker:
try:
self.execute_failure_callback(job)
except: # noqa
- self.log.error(
- 'Worker %s: error while executing failure callback',
- self.key, exc_info=True
- )
+ self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True)
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
- self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
- started_job_registry=started_job_registry)
+ self.handle_job_failure(
+ job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry
+ )
self.handle_exception(job, *exc_info)
return False
@@ -1237,10 +1270,9 @@ class Worker:
# the properties below should be safe however
extra.update({'queue': job.origin, 'job_id': job.id})
-
+
# func_name
- self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string,
- extra=extra)
+ self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra)
for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler)
@@ -1322,6 +1354,7 @@ class HerokuWorker(Worker):
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later
"""
+
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
@@ -1335,10 +1368,7 @@ class HerokuWorker(Worker):
def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0:
- self.log.info(
- 'Worker %s: warm shut down requested, sending horse SIGRTMIN signal',
- self.key
- )
+ self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key)
self.kill_horse(sig=signal.SIGRTMIN)
else:
self.log.warning('Warm shut down requested, no horse found')
@@ -1348,8 +1378,9 @@ class HerokuWorker(Worker):
self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately')
self.request_force_stop_sigrtmin(signum, frame)
else:
- self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds',
- self.imminent_shutdown_delay)
+ self.log.warning(
+ 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay
+ )
signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay)
@@ -1367,7 +1398,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
- self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1]
+ self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker):