diff options
author | Eric Feldman <ericfeldman93@gmail.com> | 2021-12-07 12:20:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-07 17:20:36 +0700 |
commit | 93f34c796f541ea4b1c156426d6524df05753826 (patch) | |
tree | 4804facf597f1a93d4bb1846b4d03c45ffd3d4d0 | |
parent | 0147b30f2bd6b5c26c530d5cb7a3c3714029b59a (diff) | |
download | rq-93f34c796f541ea4b1c156426d6524df05753826.tar.gz |
call callbacks and prepare job when running sync (#1599)
* call signals
* fix lines
* add tests and fix faliure status bug
* bump version
-rw-r--r-- | rq/queue.py | 17 | ||||
-rw-r--r-- | rq/version.py | 2 | ||||
-rw-r--r-- | tests/test_callbacks.py | 30 |
3 files changed, 48 insertions, 1 deletions
diff --git a/rq/queue.py b/rq/queue.py index 5378607..79f0abf 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import uuid +import sys import warnings from collections import namedtuple @@ -572,7 +573,23 @@ nd pipe.execute() if not self._is_async: + job = self.run_sync(job) + + return job + + def run_sync(self, job): + with self.connection.pipeline() as pipeline: + job.prepare_for_execution('sync', pipeline) + + try: job = self.run_job(job) + except: # noqa + job.set_status(JobStatus.FAILED) + if job.failure_callback: + job.failure_callback(job, self.connection, *sys.exc_info()) + else: + if job.success_callback: + job.success_callback(job, self.connection, job.result) return job diff --git a/rq/version.py b/rq/version.py index 4c7666c..c84189e 100644 --- a/rq/version.py +++ b/rq/version.py @@ -2,4 +2,4 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -VERSION = '1.10.1' +VERSION = '1.10.2' diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 57362ae..8d8edc7 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -47,6 +47,36 @@ class QueueCallbackTestCase(RQTestCase): self.assertEqual(job.failure_callback, print) +class SyncJobCallback(RQTestCase): + def test_success_callback(self): + """Test success callback is executed only when job is successful""" + queue = Queue(is_async=False) + + job = queue.enqueue(say_hello, on_success=save_result) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertEqual( + self.testconn.get('success_callback:%s' % job.id).decode(), + job.result + ) + + job = queue.enqueue(div_by_zero, on_success=save_result) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + + def test_failure_callback(self): + """queue.enqueue* methods with on_failure is persisted correctly""" + queue = Queue(is_async=False) + + job = queue.enqueue(div_by_zero, on_failure=save_exception) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertIn('div_by_zero', + self.testconn.get('failure_callback:%s' % job.id).decode()) + + job = queue.enqueue(div_by_zero, on_success=save_result) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + + class WorkerCallbackTestCase(RQTestCase): def test_success_callback(self): """Test success callback is executed only when job is successful""" |