diff options
Diffstat (limited to 'rq/worker.py')
-rw-r--r-- | rq/worker.py | 337 |
1 files changed, 184 insertions, 153 deletions
diff --git a/rq/worker.py b/rq/worker.py index 35355c7..06a707e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .defaults import ( + CALLBACK_TIMEOUT, + DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, + DEFAULT_LOGGING_DATE_FORMAT, +) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -46,8 +52,7 @@ from .results import Result from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, get_version, - make_colorizer, utcformat, utcnow, utcparse, compact) +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer @@ -55,9 +60,11 @@ from .serializers import resolve_serializer try: from setproctitle import setproctitle as setprocname except ImportError: + def setprocname(*args, **kwargs): # noqa pass + green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') @@ -69,10 +76,9 @@ class StopRequested(Exception): pass - -_signames = dict((getattr(signal, signame), signame) - for signame in dir(signal) - if signame.startswith('SIG') and '_' not in signame) +_signames = dict( + (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame +) def signal_name(signum): @@ -118,7 +124,7 @@ class Worker: job_class: Optional[Type['Job']] = None, queue_class: Optional[Type['Queue']] = None, queue: Optional['Queue'] = None, - serializer=None + serializer=None, ) -> List['Worker']: """Returns an iterable of all Workers. @@ -131,11 +137,12 @@ class Worker: connection = get_current_connection() worker_keys = get_keys(queue=queue, connection=connection) - workers = [cls.find_by_key(as_text(key), - connection=connection, - job_class=job_class, - queue_class=queue_class, serializer=serializer) - for key in worker_keys] + workers = [ + cls.find_by_key( + as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer + ) + for key in worker_keys + ] return compact(workers) @classmethod @@ -148,9 +155,8 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys - """ - return [as_text(key) - for key in get_keys(queue=queue, connection=connection)] + """ + return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): @@ -166,8 +172,14 @@ class Worker: return len(get_keys(queue=queue, connection=connection)) @classmethod - def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None, - queue_class: Type['Queue'] = None, serializer=None): + def find_by_key( + cls, + worker_key: str, + connection: Optional['Redis'] = None, + job_class: Type['Job'] = None, + queue_class: Type['Queue'] = None, + serializer=None, + ): """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. @@ -182,22 +194,39 @@ class Worker: connection.srem(cls.redis_workers_keys, worker_key) return None - name = worker_key[len(prefix):] - worker = cls([], name, connection=connection, job_class=job_class, - queue_class=queue_class, prepare_for_work=False, serializer=serializer) + name = worker_key[len(prefix) :] + worker = cls( + [], + name, + connection=connection, + job_class=job_class, + queue_class=queue_class, + prepare_for_work=False, + serializer=serializer, + ) worker.refresh() return worker - def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, - connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None, - default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None, - queue_class=None, log_job_description: bool = True, - job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, - disable_default_exception_handler: bool = False, - prepare_for_work: bool = True, serializer=None): # noqa - + def __init__( + self, + queues, + name: Optional[str] = None, + default_result_ttl=DEFAULT_RESULT_TTL, + connection: Optional['Redis'] = None, + exc_handler=None, + exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, + job_class: Type['Job'] = None, + queue_class=None, + log_job_description: bool = True, + job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, + disable_default_exception_handler: bool = False, + prepare_for_work: bool = True, + serializer=None, + ): # noqa + connection = self._set_connection(connection, default_worker_ttl) self.connection = connection self.redis_server_version = None @@ -208,11 +237,12 @@ class Worker: self.python_version = sys.version self.serializer = resolve_serializer(serializer) - queues = [self.queue_class(name=q, - connection=connection, - job_class=self.job_class, serializer=self.serializer) - if isinstance(q, str) else q - for q in ensure_list(queues)] + queues = [ + self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) + if isinstance(q, str) + else q + for q in ensure_list(queues) + ] self.name: str = name or uuid4().hex self.queues = queues @@ -250,24 +280,14 @@ class Worker: try: connection.client_setname(self.name) except redis.exceptions.ResponseError: - warnings.warn( - 'CLIENT SETNAME command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: - client_adresses = [ - client['addr'] - for client in connection.client_list() - if client['name'] == self.name - ] + client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name] if len(client_adresses) > 0: self.ip_address = client_adresses[0] else: - warnings.warn( - 'CLIENT LIST command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: self.hostname = None @@ -361,8 +381,7 @@ class Worker: def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s', self.name) - if self.connection.exists(self.key) and \ - not self.connection.hexists(self.key, 'death'): + if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'): msg = 'There exists an active worker named {0!r} already' raise ValueError(msg.format(self.name)) key = self.key @@ -436,10 +455,7 @@ class Worker: def _set_state(self, state): """Raise a DeprecationWarning if ``worker.state = X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.set_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning) self.set_state(state) def get_state(self): @@ -447,10 +463,7 @@ class Worker: def _get_state(self): """Raise a DeprecationWarning if ``worker.state == X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.get_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning) return self.get_state() state = property(_get_state, _set_state) @@ -516,8 +529,7 @@ class Worker: return pid, stat def request_force_stop(self, signum, frame): - """Terminates the application (cold shutdown). - """ + """Terminates the application (cold shutdown).""" self.log.warning('Cold shut down') # Take down the horse with the worker @@ -547,8 +559,7 @@ class Worker: if self.get_state() == WorkerStatus.BUSY: self._stop_requested = True self.set_shutdown_requested_date() - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') + self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') if self.scheduler: self.stop_scheduler() else: @@ -614,8 +625,15 @@ class Worker: def reorder_queues(self, reference_queue): pass - def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False): + def work( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + max_jobs=None, + with_scheduler: bool = False, + ): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -635,8 +653,13 @@ class Worker: if with_scheduler: self.scheduler = RQScheduler( - self.queues, connection=self.connection, logging_level=logging_level, - date_format=date_format, log_format=log_format, serializer=self.serializer) + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) self.scheduler.acquire_locks() # If lock is acquired, start scheduler if self.scheduler.acquired_locks: @@ -676,10 +699,7 @@ class Worker: completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info( - "Worker %s: finished executing %d jobs, quitting", - self.key, completed_jobs - ) + self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) break except redis.exceptions.TimeoutError: @@ -694,10 +714,7 @@ class Worker: raise except: # noqa - self.log.error( - 'Worker %s: found an unhandled exception, quitting...', - self.key, exc_info=True - ) + self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: if not self.is_horse: @@ -736,19 +753,20 @@ class Worker: self.run_maintenance_tasks() self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") - result = self.queue_class.dequeue_any(self._ordered_queues, timeout, - connection=self.connection, - job_class=self.job_class, - serializer=self.serializer) + result = self.queue_class.dequeue_any( + self._ordered_queues, + timeout, + connection=self.connection, + job_class=self.job_class, + serializer=self.serializer, + ) self.log.debug(f"Dequeued job {result[1]} from {result[0]}") if result is not None: job, queue = result job.redis_server_version = self.get_redis_server_version() if self.log_job_description: - self.log.info( - '%s: %s (%s)', green(queue.name), - blue(job.description), job.id) + self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) else: self.log.info('%s: %s', green(queue.name), job.id) @@ -756,8 +774,9 @@ class Worker: except DequeueTimeout: pass except redis.exceptions.ConnectionError as conn_err: - self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', - conn_err, connection_wait_time) + self.log.error( + 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time + ) time.sleep(connection_wait_time) connection_wait_time *= self.exponential_backoff_factor connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) @@ -782,18 +801,44 @@ class Worker: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) - self.log.debug('Sent heartbeat to prevent worker timeout. ' - 'Next one should arrive within %s seconds.', timeout) + self.log.debug( + 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout + ) def refresh(self): data = self.connection.hmget( - self.key, 'queues', 'state', 'current_job', 'last_heartbeat', - 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time', - 'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version', + self.key, + 'queues', + 'state', + 'current_job', + 'last_heartbeat', + 'birth', + 'failed_job_count', + 'successful_job_count', + 'total_working_time', + 'current_job_working_time', + 'hostname', + 'ip_address', + 'pid', + 'version', + 'python_version', ) - (queues, state, job_id, last_heartbeat, birth, failed_job_count, - successful_job_count, total_working_time, current_job_working_time, - hostname, ip_address, pid, version, python_version) = data + ( + queues, + state, + job_id, + last_heartbeat, + birth, + failed_job_count, + successful_job_count, + total_working_time, + current_job_working_time, + hostname, + ip_address, + pid, + version, + python_version, + ) = data queues = as_text(queues) self.hostname = as_text(hostname) self.ip_address = as_text(ip_address) @@ -820,10 +865,12 @@ class Worker: self.current_job_working_time = float(as_text(current_job_working_time)) if queues: - self.queues = [self.queue_class(queue, - connection=self.connection, - job_class=self.job_class, serializer=self.serializer) - for queue in queues.split(',')] + self.queues = [ + self.queue_class( + queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer + ) + for queue in queues.split(',') + ] def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection @@ -834,12 +881,10 @@ class Worker: connection.hincrby(self.key, 'successful_job_count', 1) def increment_total_working_time(self, job_execution_time, pipeline): - pipeline.hincrbyfloat(self.key, 'total_working_time', - job_execution_time.total_seconds()) + pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) def fork_work_horse(self, job: 'Job', queue: 'Queue'): - """Spawns a work horse to perform the actual work and passes it a job. - """ + """Spawns a work horse to perform the actual work and passes it a job.""" child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -909,24 +954,20 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure( - job, queue=queue, - exc_string="Job stopped by user, work-horse terminated." - ) + self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning(( - 'Moving job to FailedJobRegistry ' - '(work-horse terminated unexpectedly; waitpid returned {})' - ).format(ret_val)) + self.log.warning( + ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( + ret_val + ) + ) self.handle_job_failure( - job, queue=queue, - exc_string="Work-horse was terminated unexpectedly " - "(waitpid returned %s)" % ret_val + job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1009,8 +1050,7 @@ class Worker: msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, - exc_string=''): + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): """ Handles the failure or an executing job by: 1. Setting the job status to failed @@ -1023,10 +1063,7 @@ class Worker: with self.connection.pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry( - job.origin, - self.connection, - job_class=self.job_class, - serializer=self.serializer + job.origin, self.connection, job_class=self.job_class, serializer=self.serializer ) # check whether a job was stopped intentionally and set the job @@ -1045,14 +1082,19 @@ class Worker: started_job_registry.remove(job, pipeline=pipeline) if not self.disable_default_exception_handler and not retry: - failed_job_registry = FailedJobRegistry(job.origin, job.connection, - job_class=self.job_class, serializer=job.serializer) + failed_job_registry = FailedJobRegistry( + job.origin, job.connection, job_class=self.job_class, serializer=job.serializer + ) # Exception should be saved in job hash if server # doesn't support Redis streams _save_exc_to_job = not self.supports_redis_streams - failed_job_registry.add(job, ttl=job.failure_ttl, - exc_string=exc_string, pipeline=pipeline, - _save_exc_to_job=_save_exc_to_job) + failed_job_registry.add( + job, + ttl=job.failure_ttl, + exc_string=exc_string, + pipeline=pipeline, + _save_exc_to_job=_save_exc_to_job, + ) if self.supports_redis_streams: Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) with suppress(redis.exceptions.ConnectionError): @@ -1061,9 +1103,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_failed_job_count(pipeline) if job.started_at and job.ended_at: - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) if retry: job.retry(queue, pipeline) @@ -1099,9 +1139,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline) - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline # type: ignore - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: @@ -1111,16 +1149,15 @@ class Worker: # doesn't support Redis streams include_result = not self.supports_redis_streams # Don't clobber user's meta dictionary! - job.save(pipeline=pipeline, include_meta=False, - include_result=include_result) + job.save(pipeline=pipeline, include_meta=False, include_result=include_result) if self.supports_redis_streams: - Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result, - ttl=result_ttl, pipeline=pipeline) + Result.create( + job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline + ) finished_job_registry = queue.finished_job_registry finished_job_registry.add(job, result_ttl, pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) + job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) self.log.debug('Removing job %s from StartedJobRegistry', job.id) started_job_registry.remove(job, pipeline=pipeline) @@ -1172,9 +1209,7 @@ class Worker: if job.success_callback: self.execute_success_callback(job, rv) - self.handle_job_success(job=job, - queue=queue, - started_job_registry=started_job_registry) + self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA self.log.debug(f"Job {job.id} raised an exception.") job.ended_at = utcnow() @@ -1185,15 +1220,13 @@ class Worker: try: self.execute_failure_callback(job) except: # noqa - self.log.error( - 'Worker %s: error while executing failure callback', - self.key, exc_info=True - ) + self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, - started_job_registry=started_job_registry) + self.handle_job_failure( + job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry + ) self.handle_exception(job, *exc_info) return False @@ -1237,10 +1270,9 @@ class Worker: # the properties below should be safe however extra.update({'queue': job.origin, 'job_id': job.id}) - + # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, - extra=extra) + self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1322,6 +1354,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ + imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] @@ -1335,10 +1368,7 @@ class HerokuWorker(Worker): def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: - self.log.info( - 'Worker %s: warm shut down requested, sending horse SIGRTMIN signal', - self.key - ) + self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key) self.kill_horse(sig=signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') @@ -1348,8 +1378,9 @@ class HerokuWorker(Worker): self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately') self.request_force_stop_sigrtmin(signum, frame) else: - self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', - self.imminent_shutdown_delay) + self.log.warning( + 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay + ) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) @@ -1367,7 +1398,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] class RandomWorker(Worker): |