diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2021-12-02 07:47:43 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-02 07:47:43 +0700 |
commit | 0147b30f2bd6b5c26c530d5cb7a3c3714029b59a (patch) | |
tree | cc69501cd5d9722bea228f1416820de48fa28b35 /tests | |
parent | 76ba690aafaa0fffbab1ee8f5db7c27c5327d792 (diff) | |
download | rq-0147b30f2bd6b5c26c530d5cb7a3c3714029b59a.tar.gz |
Fixes a bug that causes leftover job keys when result_ttl=0 (#1591)
* Fixes a bug that causes leftover job keys when result_ttl=0
* Fixed a buggy worker.maintain_heartbeats() behavior
* Fixed a bug in worker.maintain_heartbeats().
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_worker.py | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/tests/test_worker.py b/tests/test_worker.py index 327c617..0e944c2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -317,6 +317,20 @@ class TestWorker(RQTestCase): self.testconn.hdel(w.key, 'birth') w.refresh() + def test_maintain_heartbeats(self): + """worker.maintain_heartbeats() shouldn't create new job keys""" + queue = Queue(connection=self.testconn) + worker = Worker([queue], connection=self.testconn) + job = queue.enqueue(say_hello) + worker.maintain_heartbeats(job) + self.assertTrue(self.testconn.exists(worker.key)) + self.assertTrue(self.testconn.exists(job.key)) + + self.testconn.delete(job.key) + + worker.maintain_heartbeats(job) + self.assertFalse(self.testconn.exists(job.key)) + @slow def test_heartbeat_survives_lost_connection(self): with mock.patch.object(Worker, 'heartbeat') as mocked: @@ -1202,23 +1216,29 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): """ fooq = Queue('foo') self.assertEqual(fooq.count, 0) - w = Worker(fooq) + w = Worker([fooq], job_monitoring_interval=1) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' if os.path.exists(sentinel_file): os.remove(sentinel_file) - fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100) - job, queue = w.dequeue_job_and_maintain_ttl(5) + + job = fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100) + + _, queue = w.dequeue_job_and_maintain_ttl(5) + w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - w.job_monitoring_interval = 1 - now = utcnow() + time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) + w.monitor_work_horse(job, queue) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor + + now = utcnow() self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertEqual(job.get_status(), JobStatus.FAILED) failed_job_registry = FailedJobRegistry(queue=fooq) |