summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Feldman <ericfeldman93@gmail.com>2021-12-07 12:20:36 +0200
committerGitHub <noreply@github.com>2021-12-07 17:20:36 +0700
commit93f34c796f541ea4b1c156426d6524df05753826 (patch)
tree4804facf597f1a93d4bb1846b4d03c45ffd3d4d0
parent0147b30f2bd6b5c26c530d5cb7a3c3714029b59a (diff)
downloadrq-93f34c796f541ea4b1c156426d6524df05753826.tar.gz
call callbacks and prepare job when running sync (#1599)
* call signals * fix lines * add tests and fix faliure status bug * bump version
-rw-r--r--rq/queue.py17
-rw-r--r--rq/version.py2
-rw-r--r--tests/test_callbacks.py30
3 files changed, 48 insertions, 1 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 5378607..79f0abf 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import uuid
+import sys
import warnings
from collections import namedtuple
@@ -572,7 +573,23 @@ nd
pipe.execute()
if not self._is_async:
+ job = self.run_sync(job)
+
+ return job
+
+ def run_sync(self, job):
+ with self.connection.pipeline() as pipeline:
+ job.prepare_for_execution('sync', pipeline)
+
+ try:
job = self.run_job(job)
+ except: # noqa
+ job.set_status(JobStatus.FAILED)
+ if job.failure_callback:
+ job.failure_callback(job, self.connection, *sys.exc_info())
+ else:
+ if job.success_callback:
+ job.success_callback(job, self.connection, job.result)
return job
diff --git a/rq/version.py b/rq/version.py
index 4c7666c..c84189e 100644
--- a/rq/version.py
+++ b/rq/version.py
@@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
-VERSION = '1.10.1'
+VERSION = '1.10.2'
diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py
index 57362ae..8d8edc7 100644
--- a/tests/test_callbacks.py
+++ b/tests/test_callbacks.py
@@ -47,6 +47,36 @@ class QueueCallbackTestCase(RQTestCase):
self.assertEqual(job.failure_callback, print)
+class SyncJobCallback(RQTestCase):
+ def test_success_callback(self):
+ """Test success callback is executed only when job is successful"""
+ queue = Queue(is_async=False)
+
+ job = queue.enqueue(say_hello, on_success=save_result)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+ self.assertEqual(
+ self.testconn.get('success_callback:%s' % job.id).decode(),
+ job.result
+ )
+
+ job = queue.enqueue(div_by_zero, on_success=save_result)
+ self.assertEqual(job.get_status(), JobStatus.FAILED)
+ self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
+
+ def test_failure_callback(self):
+ """queue.enqueue* methods with on_failure is persisted correctly"""
+ queue = Queue(is_async=False)
+
+ job = queue.enqueue(div_by_zero, on_failure=save_exception)
+ self.assertEqual(job.get_status(), JobStatus.FAILED)
+ self.assertIn('div_by_zero',
+ self.testconn.get('failure_callback:%s' % job.id).decode())
+
+ job = queue.enqueue(div_by_zero, on_success=save_result)
+ self.assertEqual(job.get_status(), JobStatus.FAILED)
+ self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
+
+
class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""