summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2021-12-02 07:47:43 +0700
committerGitHub <noreply@github.com>2021-12-02 07:47:43 +0700
commit0147b30f2bd6b5c26c530d5cb7a3c3714029b59a (patch)
treecc69501cd5d9722bea228f1416820de48fa28b35 /tests
parent76ba690aafaa0fffbab1ee8f5db7c27c5327d792 (diff)
downloadrq-0147b30f2bd6b5c26c530d5cb7a3c3714029b59a.tar.gz
Fixes a bug that causes leftover job keys when result_ttl=0 (#1591)
* Fixes a bug that causes leftover job keys when result_ttl=0 * Fixed a buggy worker.maintain_heartbeats() behavior * Fixed a bug in worker.maintain_heartbeats().
Diffstat (limited to 'tests')
-rw-r--r--tests/test_worker.py30
1 files changed, 25 insertions, 5 deletions
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 327c617..0e944c2 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -317,6 +317,20 @@ class TestWorker(RQTestCase):
self.testconn.hdel(w.key, 'birth')
w.refresh()
+ def test_maintain_heartbeats(self):
+ """worker.maintain_heartbeats() shouldn't create new job keys"""
+ queue = Queue(connection=self.testconn)
+ worker = Worker([queue], connection=self.testconn)
+ job = queue.enqueue(say_hello)
+ worker.maintain_heartbeats(job)
+ self.assertTrue(self.testconn.exists(worker.key))
+ self.assertTrue(self.testconn.exists(job.key))
+
+ self.testconn.delete(job.key)
+
+ worker.maintain_heartbeats(job)
+ self.assertFalse(self.testconn.exists(job.key))
+
@slow
def test_heartbeat_survives_lost_connection(self):
with mock.patch.object(Worker, 'heartbeat') as mocked:
@@ -1202,23 +1216,29 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
"""
fooq = Queue('foo')
self.assertEqual(fooq.count, 0)
- w = Worker(fooq)
+ w = Worker([fooq], job_monitoring_interval=1)
+
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
- fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
- job, queue = w.dequeue_job_and_maintain_ttl(5)
+
+ job = fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
+
+ _, queue = w.dequeue_job_and_maintain_ttl(5)
+ w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
- w.job_monitoring_interval = 1
- now = utcnow()
+
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
+
w.monitor_work_horse(job, queue)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor
+
+ now = utcnow()
self.assertTrue((utcnow() - now).total_seconds() < total_time)
self.assertEqual(job.get_status(), JobStatus.FAILED)
failed_job_registry = FailedJobRegistry(queue=fooq)