diff options
author | mr-trouble <39416610+mr-trouble@users.noreply.github.com> | 2020-01-19 03:31:06 -0800 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2020-01-19 18:31:06 +0700 |
commit | 5f949f4cefed67de2554ef8dfc08bd62142a52a4 (patch) | |
tree | 7e27bee390380b2f37db5cdcd7ec75a5e58a44d7 | |
parent | 37a6304a4f7db6a86c8e9216beb3214e6f716640 (diff) | |
download | rq-5f949f4cefed67de2554ef8dfc08bd62142a52a4.tar.gz |
Add a hard kill from the parent process with a 10% increased timeout … (#1169)
* Add a hard kill from the parent process with a 10% increased timeout in case the forked process gets stuck and cannot stop itself.
* Added test for the force kill of the parent process.
* Changed 10% to +1 second, and other misc changes based on review comments.
-rw-r--r-- | rq/worker.py | 9 | ||||
-rw-r--r-- | tests/test_worker.py | 27 |
2 files changed, 36 insertions, 0 deletions
diff --git a/rq/worker.py b/rq/worker.py index 25b98e8..92e63b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -674,6 +674,9 @@ class Worker(object): either executes successfully or the status of the job is set to failed """ + + ret_val = None + job.started_at = job.started_at or utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -683,6 +686,12 @@ class Worker(object): # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. self.heartbeat(self.job_monitoring_interval + 5) + + # Kill the job from this side if something is really wrong (interpreter lock/etc). + if (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): + self.kill_horse() + break + except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during diff --git a/tests/test_worker.py b/tests/test_worker.py index e983bf5..7d8ed5a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1055,6 +1055,33 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(job in failed_job_registry) self.assertEqual(fooq.count, 0) + @slow + def test_work_horse_force_death(self): + """Simulate a frozen worker that doesn't observe the timeout properly. + Fake it by artificially setting the timeout of the parent process to + something much smaller after the process is already forked. + """ + fooq = Queue('foo') + self.assertEqual(fooq.count, 0) + w = Worker(fooq) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' + if os.path.exists(sentinel_file): + os.remove(sentinel_file) + fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + job, queue = w.dequeue_job_and_maintain_ttl(5) + w.fork_work_horse(job, queue) + job.timeout = 5 + w.job_monitoring_interval = 1 + now = utcnow() + w.monitor_work_horse(job) + fudge_factor = 1 + total_time = w.job_monitoring_interval + 5 + fudge_factor + self.assertTrue((utcnow() - now).total_seconds() < total_time) + self.assertEqual(job.get_status(), JobStatus.FAILED) + failed_job_registry = FailedJobRegistry(queue=fooq) + self.assertTrue(job in failed_job_registry) + self.assertEqual(fooq.count, 0) + def schedule_access_self(): q = Queue('default', connection=get_current_connection()) |