summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2017-10-28 17:45:49 +0700
committerGitHub <noreply@github.com>2017-10-28 17:45:49 +0700
commit1d7b5e834b7d9042215243ca6d0856a87e7f0f5d (patch)
treee398f5b7901313e27666b7decdc0daef89351e5b
parent32747b59fafc821689fcbaf8419b63d694e0e125 (diff)
downloadrq-1d7b5e834b7d9042215243ca6d0856a87e7f0f5d.tar.gz
Worker statistics (#897)
* First stab at implementing worker statistics. * Moved worker data restoration logic to worker.refresh(). * Failed and successfull job counts are now properly incremented. * Worker now keeps track of total_working_time * Ensure job.ended_at is set in the case of unhandled job failure. * handle_job_failure shouldn't crash if job.started_at is not present.
-rw-r--r--rq/queue.py1
-rw-r--r--rq/worker.py94
-rw-r--r--tests/test_worker.py47
3 files changed, 119 insertions, 23 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 11e6553..a537536 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -510,7 +510,6 @@ class FailedQueue(Queue):
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
- job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
diff --git a/rq/worker.py b/rq/worker.py
index 8dd855a..7602c2e 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -132,15 +132,9 @@ class Worker(object):
connection=connection,
job_class=job_class,
queue_class=queue_class)
- queues, state, job_id = connection.hmget(worker.key, 'queues', 'state', 'current_job')
- queues = as_text(queues)
- worker._state = as_text(state or '?')
- worker._job_id = job_id or None
- if queues:
- worker.queues = [worker.queue_class(queue,
- connection=connection,
- job_class=job_class)
- for queue in queues.split(',')]
+
+ worker.refresh()
+
return worker
def __init__(self, queues, name=None, default_result_ttl=None, connection=None,
@@ -179,6 +173,10 @@ class Worker(object):
self.failed_queue = get_failed_queue(connection=self.connection,
job_class=self.job_class)
self.last_cleaned_at = None
+ self.successful_job_count = 0
+ self.failed_job_count = 0
+ self.total_working_time = 0
+ self.birth_date = None
# By default, push the "move-to-failed-queue" exception handler onto
# the stack
@@ -264,7 +262,11 @@ class Worker(object):
queues = ','.join(self.queue_names())
with self.connection._pipeline() as p:
p.delete(key)
- p.hset(key, 'birth', utcformat(utcnow()))
+ now = utcnow()
+ now_in_string = utcformat(utcnow())
+ self.birth_date = now
+ p.hset(key, 'birth', now_in_string)
+ p.hset(key, 'last_heartbeat', now_in_string)
p.hset(key, 'queues', queues)
p.sadd(self.redis_workers_keys, key)
p.expire(key, self.default_worker_ttl)
@@ -285,12 +287,12 @@ class Worker(object):
"""Sets the date on which the worker received a (warm) shutdown request"""
self.connection.hset(self.key, 'shutdown_requested_date', utcformat(utcnow()))
- @property
- def birth_date(self):
- """Fetches birth date from Redis."""
- birth_timestamp = self.connection.hget(self.key, 'birth')
- if birth_timestamp is not None:
- return utcparse(as_text(birth_timestamp))
+ # @property
+ # def birth_date(self):
+ # """Fetches birth date from Redis."""
+ # birth_timestamp = self.connection.hget(self.key, 'birth')
+ # if birth_timestamp is not None:
+ # return utcparse(as_text(birth_timestamp))
@property
def shutdown_requested_date(self):
@@ -525,9 +527,46 @@ class Worker(object):
timeout = max(timeout, self.default_worker_ttl)
connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout)
+ connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
+ def refresh(self):
+ data = self.connection.hmget(
+ self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
+ 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time'
+ )
+ queues, state, job_id, last_heartbeat, birth, failed_job_count, successful_job_count, total_working_time = data
+ queues = as_text(queues)
+ self._state = as_text(state or '?')
+ self._job_id = job_id or None
+ self.last_heartbeat = utcparse(as_text(last_heartbeat))
+ self.birth_date = utcparse(as_text(birth))
+ if failed_job_count:
+ self.failed_job_count = int(as_text(failed_job_count))
+ if successful_job_count:
+ self.successful_job_count = int(as_text(successful_job_count))
+ if total_working_time:
+ self.total_working_time = float(as_text(total_working_time))
+
+ if queues:
+ self.queues = [self.queue_class(queue,
+ connection=self.connection,
+ job_class=self.job_class)
+ for queue in queues.split(',')]
+
+ def increment_failed_job_count(self, pipeline=None):
+ connection = pipeline if pipeline is not None else self.connection
+ connection.hincrby(self.key, 'failed_job_count', 1)
+
+ def increment_successful_job_count(self, pipeline=None):
+ connection = pipeline if pipeline is not None else self.connection
+ connection.hincrby(self.key, 'successful_job_count', 1)
+
+ def increment_total_working_time(self, job_execution_time, pipeline):
+ pipeline.hincrbyfloat(self.key, 'total_working_time',
+ job_execution_time.microseconds)
+
def fork_work_horse(self, job, queue):
"""Spawns a work horse to perform the actual work and passes it a job.
"""
@@ -567,6 +606,10 @@ class Worker(object):
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
+
+ if not job.ended_at:
+ job.ended_at = utcnow()
+
self.handle_job_failure(job=job)
# Unhandled failure: move the job to the failed queue
@@ -635,8 +678,7 @@ class Worker(object):
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
- self.connection._hset(job.key, 'started_at',
- utcformat(utcnow()), pipeline)
+ pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'
@@ -648,7 +690,6 @@ class Worker(object):
2. Removing the job from the started_job_registry
3. Setting the workers current job to None
"""
-
with self.connection._pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(job.origin,
@@ -657,6 +698,11 @@ class Worker(object):
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
+ self.increment_failed_job_count(pipeline)
+ if job.started_at and job.ended_at:
+ self.increment_total_working_time(job.ended_at - job.started_at,
+ pipeline)
+
try:
pipeline.execute()
except Exception:
@@ -665,6 +711,7 @@ class Worker(object):
pass
def handle_job_success(self, job, queue, started_job_registry):
+
with self.connection._pipeline() as pipeline:
while True:
try:
@@ -675,6 +722,10 @@ class Worker(object):
queue.enqueue_dependents(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
+ self.increment_successful_job_count(pipeline=pipeline)
+ self.increment_total_working_time(
+ job.ended_at - job.started_at, pipeline
+ )
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
@@ -708,7 +759,8 @@ class Worker(object):
self.connection,
job_class=self.job_class)
- try:
+ try:
+ job.started_at = utcnow()
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()
@@ -722,6 +774,8 @@ class Worker(object):
queue=queue,
started_job_registry=started_job_registry)
except Exception:
+
+ job.ended_at = utcnow()
self.handle_job_failure(job=job,
started_job_registry=started_job_registry)
self.handle_exception(job, *sys.exc_info())
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 85435e7..3f76e4d 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import os
import shutil
-from datetime import timedelta
+from datetime import datetime, timedelta
from time import sleep
import signal
import time
@@ -197,6 +197,18 @@ class TestWorker(RQTestCase):
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
+ def test_heartbeat(self):
+ """Heartbeat saves last_heartbeat"""
+ q = Queue()
+ w = Worker([q])
+ w.register_birth()
+ w.heartbeat()
+ last_heartbeat = self.testconn.hget(w.key, 'last_heartbeat')
+
+ self.assertTrue(last_heartbeat is not None)
+ w = Worker.find_by_key(w.key)
+ self.assertIsInstance(w.last_heartbeat, datetime)
+
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
q = Queue()
@@ -230,6 +242,36 @@ class TestWorker(RQTestCase):
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info
+ def test_statistics(self):
+ """Successful and failed job counts are saved properly"""
+ q = Queue()
+ job = q.enqueue(div_by_zero)
+ w = Worker([q])
+ w.register_birth()
+
+ self.assertEqual(w.failed_job_count, 0)
+ self.assertEqual(w.successful_job_count, 0)
+ self.assertEqual(w.total_working_time, 0)
+
+ registry = StartedJobRegistry(connection=w.connection)
+ job.started_at = utcnow()
+ job.ended_at = job.started_at + timedelta(seconds=0.75)
+ w.handle_job_failure(job)
+ w.handle_job_success(job, q, registry)
+
+ w.refresh()
+ self.assertEqual(w.failed_job_count, 1)
+ self.assertEqual(w.successful_job_count, 1)
+ self.assertEqual(w.total_working_time, 1500000) # 1.5 seconds in microseconds
+
+ w.handle_job_failure(job)
+ w.handle_job_success(job, q, registry)
+
+ w.refresh()
+ self.assertEqual(w.failed_job_count, 2)
+ self.assertEqual(w.successful_job_count, 2)
+ self.assertEqual(w.total_working_time, 3000000)
+
def test_custom_exc_handling(self):
"""Custom exception handling."""
def black_hole(job, *exc_info):
@@ -559,7 +601,7 @@ class TestWorker(RQTestCase):
death_date = w.death_date
self.assertIsNotNone(death_date)
- self.assertEqual(type(death_date).__name__, 'datetime')
+ self.assertIsInstance(death_date, datetime)
def test_clean_queue_registries(self):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
@@ -799,6 +841,7 @@ def schedule_access_self():
q.enqueue(access_self)
+@pytest.mark.skipif(sys.platform == 'darwin', reason='Fails on OS X')
class TestWorkerSubprocess(RQTestCase):
def setUp(self):
super(TestWorkerSubprocess, self).setUp()