summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuslan Mullakhmetov <theambient@me.com>2020-08-04 11:48:01 +0200
committerGitHub <noreply@github.com>2020-08-04 16:48:01 +0700
commitc2931b45b624215d6b4ab6f57e8d95b335472814 (patch)
treef23523173f3bc75e19c2fbb71fae38dd88872980
parent57f286eac4c7f4a02deff963ccd7abceb4687855 (diff)
downloadrq-c2931b45b624215d6b4ab6f57e8d95b335472814.tar.gz
handled unhandled exceptions in horse (#1303)
* handled unhandled exceptions in horse to prevent a job from being silently dropped without going into FailedRegistry * changes after review * made sure that work_horse always terminates in a proper way with tests * minor refactoring * fix for failing test * fixes for the other tests - removed exception handling (done in monitor_work_horse) - adjusted some tests for the checks that are not relevant anymore * review suggested changes * cleanup Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
-rw-r--r--rq/worker.py9
-rw-r--r--tests/fixtures.py5
-rw-r--r--tests/test_worker.py45
3 files changed, 44 insertions, 15 deletions
diff --git a/rq/worker.py b/rq/worker.py
index bae12f1..c646970 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -703,6 +703,7 @@ class Worker(object):
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
+ os._exit(0) # just in case
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
@@ -785,7 +786,10 @@ class Worker(object):
self.setup_work_horse_signals()
self._is_horse = True
self.log = logger
- self.perform_job(job, queue)
+ try:
+ self.perform_job(job, queue)
+ except:
+ os._exit(1)
# os._exit() is the way to exit from childs after a fork(), in
# contrast to the regular sys.exit()
@@ -922,12 +926,13 @@ class Worker(object):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
- self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection)
started_job_registry = queue.started_job_registry
try:
+ self.prepare_job_execution(job, heartbeat_ttl)
+
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
diff --git a/tests/fixtures.py b/tests/fixtures.py
index a1f507a..057b932 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -37,6 +37,11 @@ def do_nothing():
"""The best job in the world."""
pass
+def raise_exc():
+ raise Exception('raise_exc error')
+
+def raise_exc_mock():
+ return raise_exc
def div_by_zero(x):
"""Prepare for a division-by-zero exception."""
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 931c1fa..a5d4026 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -25,7 +25,7 @@ from tests import RQTestCase, slow
from tests.fixtures import (
access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing,
kill_worker, long_running_job, modify_self, modify_self_and_error,
- run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid,
+ run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
@@ -312,6 +312,37 @@ class TestWorker(RQTestCase):
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertTrue(job.exc_info) # should contain exc_info
+ def test_horse_fails(self):
+ """Tests that job status is set to FAILED even if horse unexpectedly fails"""
+ q = Queue()
+ self.assertEqual(q.count, 0)
+
+ # Action
+ job = q.enqueue(say_hello)
+ self.assertEqual(q.count, 1)
+
+ # keep for later
+ enqueued_at_date = str(job.enqueued_at)
+
+ w = Worker([q])
+ with mock.patch.object(w, 'perform_job', new_callable=raise_exc_mock):
+ w.work(burst=True) # should silently pass
+
+ # Postconditions
+ self.assertEqual(q.count, 0)
+ failed_job_registry = FailedJobRegistry(queue=q)
+ self.assertTrue(job in failed_job_registry)
+ self.assertEqual(w.get_current_job_id(), None)
+
+ # Check the job
+ job = Job.fetch(job.id)
+ self.assertEqual(job.origin, q.name)
+
+ # Should be the original enqueued_at date, not the date of enqueueing
+ # to the failed queue
+ self.assertEqual(str(job.enqueued_at), enqueued_at_date)
+ self.assertTrue(job.exc_info) # should contain exc_info
+
def test_statistics(self):
"""Successful and failed job counts are saved properly"""
queue = Queue()
@@ -1206,10 +1237,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertEqual(p.exitcode, 1)
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
- with open(os.path.join(self.sandbox, 'stderr.log')) as f:
- stderr = f.read().strip('\n')
- err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)'
- self.assertTrue(stderr.endswith(err), stderr)
@slow
def test_1_sec_shutdown(self):
@@ -1226,10 +1253,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
- with open(os.path.join(self.sandbox, 'stderr.log')) as f:
- stderr = f.read().strip('\n')
- err = 'ShutDownImminentException: shut down imminent (signal: SIGALRM)'
- self.assertTrue(stderr.endswith(err), stderr)
@slow
def test_shutdown_double_sigrtmin(self):
@@ -1247,10 +1270,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
- with open(os.path.join(self.sandbox, 'stderr.log')) as f:
- stderr = f.read().strip('\n')
- err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)'
- self.assertTrue(stderr.endswith(err), stderr)
@mock.patch('rq.worker.logger.info')
def test_handle_shutdown_request(self, mock_logger_info):