summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormr-trouble <39416610+mr-trouble@users.noreply.github.com>2020-01-19 03:31:06 -0800
committerSelwin Ong <selwin.ong@gmail.com>2020-01-19 18:31:06 +0700
commit5f949f4cefed67de2554ef8dfc08bd62142a52a4 (patch)
tree7e27bee390380b2f37db5cdcd7ec75a5e58a44d7
parent37a6304a4f7db6a86c8e9216beb3214e6f716640 (diff)
downloadrq-5f949f4cefed67de2554ef8dfc08bd62142a52a4.tar.gz
Add a hard kill from the parent process with a 10% increased timeout … (#1169)
* Add a hard kill from the parent process with a 10% increased timeout in case the forked process gets stuck and cannot stop itself. * Added test for the force kill of the parent process. * Changed 10% to +1 second, and other misc changes based on review comments.
-rw-r--r--rq/worker.py9
-rw-r--r--tests/test_worker.py27
2 files changed, 36 insertions, 0 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 25b98e8..92e63b4 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -674,6 +674,9 @@ class Worker(object):
either executes successfully or the status of the job is set to
failed
"""
+
+ ret_val = None
+ job.started_at = job.started_at or utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
@@ -683,6 +686,12 @@ class Worker(object):
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)
+
+ # Kill the job from this side if something is really wrong (interpreter lock/etc).
+ if (utcnow() - job.started_at).total_seconds() > (job.timeout + 1):
+ self.kill_horse()
+ break
+
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
diff --git a/tests/test_worker.py b/tests/test_worker.py
index e983bf5..7d8ed5a 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -1055,6 +1055,33 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0)
+ @slow
+ def test_work_horse_force_death(self):
+ """Simulate a frozen worker that doesn't observe the timeout properly.
+ Fake it by artificially setting the timeout of the parent process to
+ something much smaller after the process is already forked.
+ """
+ fooq = Queue('foo')
+ self.assertEqual(fooq.count, 0)
+ w = Worker(fooq)
+ sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
+ if os.path.exists(sentinel_file):
+ os.remove(sentinel_file)
+ fooq.enqueue(create_file_after_timeout, sentinel_file, 100)
+ job, queue = w.dequeue_job_and_maintain_ttl(5)
+ w.fork_work_horse(job, queue)
+ job.timeout = 5
+ w.job_monitoring_interval = 1
+ now = utcnow()
+ w.monitor_work_horse(job)
+ fudge_factor = 1
+ total_time = w.job_monitoring_interval + 5 + fudge_factor
+ self.assertTrue((utcnow() - now).total_seconds() < total_time)
+ self.assertEqual(job.get_status(), JobStatus.FAILED)
+ failed_job_registry = FailedJobRegistry(queue=fooq)
+ self.assertTrue(job in failed_job_registry)
+ self.assertEqual(fooq.count, 0)
+
def schedule_access_self():
q = Queue('default', connection=get_current_connection())