summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2021-12-02 07:47:43 +0700
committerGitHub <noreply@github.com>2021-12-02 07:47:43 +0700
commit0147b30f2bd6b5c26c530d5cb7a3c3714029b59a (patch)
treecc69501cd5d9722bea228f1416820de48fa28b35
parent76ba690aafaa0fffbab1ee8f5db7c27c5327d792 (diff)
downloadrq-0147b30f2bd6b5c26c530d5cb7a3c3714029b59a.tar.gz
Fixes a bug that causes leftover job keys when result_ttl=0 (#1591)
* Fixes a bug that causes leftover job keys when result_ttl=0 * Fixed a buggy worker.maintain_heartbeats() behavior * Fixed a bug in worker.maintain_heartbeats().
-rw-r--r--rq/worker.py29
-rw-r--r--tests/test_worker.py30
2 files changed, 49 insertions, 10 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 11ea290..cac6bdb 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -810,11 +810,7 @@ class Worker:
self.wait_for_horse()
break
- with self.connection.pipeline() as pipeline:
- self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
- ttl = self.get_heartbeat_ttl(job)
- job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
- pipeline.execute()
+ self.maintain_heartbeats(job)
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
@@ -832,7 +828,9 @@ class Worker:
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return
+
job_status = job.get_status()
+
if job_status is None: # Job completed and its ttl has expired
return
elif job_status == JobStatus.STOPPED:
@@ -869,6 +867,27 @@ class Worker:
self.monitor_work_horse(job, queue)
self.set_state(WorkerStatus.IDLE)
+ def maintain_heartbeats(self, job):
+ """Updates worker and job's last heartbeat field. If job was
+ enqueued with `result_ttl=0`, a race condition could happen where this heartbeat
+ arrives after job has been deleted, leaving a job key that contains only
+ `last_heartbeat` field.
+
+ hset() is used when updating job's timestamp. This command returns 1 if a new
+ Redis key is created, 0 otherwise. So in this case we check the return of job's
+ heartbeat() command. If a new key was created, this means the job was already
+ deleted. In this case, we simply send another delete command to remove the key.
+
+ https://github.com/rq/rq/issues/1450
+ """
+ with self.connection.pipeline() as pipeline:
+ self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
+ ttl = self.get_heartbeat_ttl(job)
+ job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
+ results = pipeline.execute()
+ if results[2] == 1:
+ self.connection.delete(job.key)
+
def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 327c617..0e944c2 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -317,6 +317,20 @@ class TestWorker(RQTestCase):
self.testconn.hdel(w.key, 'birth')
w.refresh()
+ def test_maintain_heartbeats(self):
+ """worker.maintain_heartbeats() shouldn't create new job keys"""
+ queue = Queue(connection=self.testconn)
+ worker = Worker([queue], connection=self.testconn)
+ job = queue.enqueue(say_hello)
+ worker.maintain_heartbeats(job)
+ self.assertTrue(self.testconn.exists(worker.key))
+ self.assertTrue(self.testconn.exists(job.key))
+
+ self.testconn.delete(job.key)
+
+ worker.maintain_heartbeats(job)
+ self.assertFalse(self.testconn.exists(job.key))
+
@slow
def test_heartbeat_survives_lost_connection(self):
with mock.patch.object(Worker, 'heartbeat') as mocked:
@@ -1202,23 +1216,29 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
"""
fooq = Queue('foo')
self.assertEqual(fooq.count, 0)
- w = Worker(fooq)
+ w = Worker([fooq], job_monitoring_interval=1)
+
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
- fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
- job, queue = w.dequeue_job_and_maintain_ttl(5)
+
+ job = fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
+
+ _, queue = w.dequeue_job_and_maintain_ttl(5)
+ w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
- w.job_monitoring_interval = 1
- now = utcnow()
+
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
+
w.monitor_work_horse(job, queue)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor
+
+ now = utcnow()
self.assertTrue((utcnow() - now).total_seconds() < total_time)
self.assertEqual(job.get_status(), JobStatus.FAILED)
failed_job_registry = FailedJobRegistry(queue=fooq)