summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuslan Mullakhmetov <theambient@me.com>2020-10-26 14:42:02 +0100
committerGitHub <noreply@github.com>2020-10-26 20:42:02 +0700
commited264f08bb53581bf4b2eb3f1e013b9615c37c52 (patch)
treef3d22c336f22f1576a8eb5a331e70e47483cdb91
parenta721db34b171fd38eb262c858cc5ca32cc954bfb (diff)
downloadrq-ed264f08bb53581bf4b2eb3f1e013b9615c37c52.tar.gz
feat: added job heartbeat to track whether job is actually executing (#1349)
* feat: added job heartbeat to track whether job is actually executing heartbeat might be needed in cases when worker was hardkilled or the whole VM/docker was forcibly rebooted. * fixed tests * fixed test coverage issue * chore: renamed job.heartbeat stuff according to review feedback * chore: pipelined worker heartbeat and job heartbeat * docs: documented job.heartbeat property * fixes after review * docs: updated last_heartbeat description * chore: review Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
-rw-r--r--docs/docs/jobs.md1
-rw-r--r--rq/job.py8
-rw-r--r--rq/worker.py8
-rw-r--r--tests/test_job.py13
4 files changed, 26 insertions, 4 deletions
diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md
index 4720b63..7ede815 100644
--- a/docs/docs/jobs.md
+++ b/docs/docs/jobs.md
@@ -121,6 +121,7 @@ Some interesting job attributes include:
* `job.started_at`
* `job.ended_at`
* `job.exc_info` stores exception information if job doesn't finish successfully.
+* `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active.
If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.
diff --git a/rq/job.py b/rq/job.py
index 6084d92..868a44b 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -351,6 +351,7 @@ class Job(object):
# retry_intervals is a list of int e.g [60, 120, 240]
self.retry_intervals = None
self.redis_server_version = None
+ self.last_heartbeat = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@@ -384,6 +385,11 @@ class Job(object):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
+ def heartbeat(self, heartbeat, pipeline=None):
+ self.last_heartbeat = heartbeat
+ connection = pipeline if pipeline is not None else self.connection
+ connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
+
id = property(get_id, set_id)
@classmethod
@@ -477,6 +483,7 @@ class Job(object):
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
self.ended_at = str_to_date(obj.get('ended_at'))
+ self.last_heartbeat = str_to_date(obj.get('last_heartbeat'))
result = obj.get('result')
if result:
try:
@@ -530,6 +537,7 @@ class Job(object):
'data': zlib.compress(self.data),
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
+ 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
}
if self.retries_left is not None:
diff --git a/rq/worker.py b/rq/worker.py
index 13ed298..cfbc675 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -751,14 +751,19 @@ class Worker(object):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
- self.heartbeat(self.job_monitoring_interval + 60)
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60):
+ self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break
+ with self.connection.pipeline() as pipeline:
+ self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
+ job.heartbeat(utcnow(), pipeline=pipeline)
+ pipeline.execute()
+
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
@@ -853,6 +858,7 @@ class Worker(object):
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
+ job.heartbeat(utcnow(), pipeline=pipeline)
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
pipeline.execute()
diff --git a/tests/test_job.py b/tests/test_job.py
index a993c60..2987c5b 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -222,8 +222,15 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
- [b'created_at', b'data', b'description', b'ended_at', b'started_at'])
-
+ [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at'])
+
+ self.assertEqual(job.last_heartbeat, None)
+ self.assertEqual(job.last_heartbeat, None)
+
+ ts = utcnow()
+ job.heartbeat(ts)
+ self.assertEqual(job.last_heartbeat, ts)
+
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation)
@@ -979,4 +986,4 @@ class TestJob(RQTestCase):
self.assertEqual(job.get_retry_interval(), 2)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 3)
- \ No newline at end of file
+