summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2017-11-18 07:13:10 +0700
committerSelwin Ong <selwin.ong@gmail.com>2017-11-18 07:13:10 +0700
commit8cca220cce8739bf1c2b1852830874e35bf1ebaf (patch)
treea32721c917865af39660455706e9d616757e2d2b
parentff36e0656e3484ec648c682214b48aad3f924e88 (diff)
downloadrq-8cca220cce8739bf1c2b1852830874e35bf1ebaf.tar.gz
job.exc_info is now compressed.
-rw-r--r--rq/job.py13
-rw-r--r--tests/test_job.py32
2 files changed, 40 insertions, 5 deletions
diff --git a/rq/job.py b/rq/job.py
index 83baacb..6125762 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import inspect
import warnings
+import zlib
from functools import partial
from uuid import uuid4
@@ -427,7 +428,6 @@ class Job(object):
self.started_at = to_date(as_text(obj.get('started_at')))
self.ended_at = to_date(as_text(obj.get('ended_at')))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
- self.exc_info = as_text(obj.get('exc_info'))
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self._status = as_text(obj.get('status') if obj.get('status') else None)
@@ -435,6 +435,15 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
+ raw_exc_info = obj.get('exc_info')
+ if raw_exc_info:
+ try:
+ self.exc_info = as_text(zlib.decompress(raw_exc_info))
+ except zlib.error:
+ # Fallback to uncompressed string
+ self.exc_info = as_text(raw_exc_info)
+
+
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
@@ -462,7 +471,7 @@ class Job(object):
except:
obj['result'] = 'Unpickleable return value'
if self.exc_info is not None:
- obj['exc_info'] = self.exc_info
+ obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
diff --git a/tests/test_job.py b/tests/test_job.py
index fe9afcd..d10d55e 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -3,9 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
from datetime import datetime
-import time
+import time
import sys
+import zlib
is_py2 = sys.version[0] == '2'
if is_py2:
@@ -15,7 +16,7 @@ else:
from tests import fixtures, RQTestCase
-from rq.compat import PY2
+from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job
from rq.queue import Queue, get_failed_queue
@@ -263,6 +264,31 @@ class TestJob(RQTestCase):
with self.assertRaises(AttributeError):
job.func # accessing the func property should fail
+ def test_compressed_exc_info_handling(self):
+ """Jobs handle both compressed and uncompressed exc_info"""
+ exception_string = 'Some exception'
+
+ job = Job.create(func=fixtures.say_hello, args=('Lionel',))
+ job.exc_info = exception_string
+ job.save()
+
+ # exc_info is stored in compressed format
+ exc_info = self.testconn.hget(job.key, 'exc_info')
+ self.assertEqual(
+ as_text(zlib.decompress(exc_info)),
+ exception_string
+ )
+
+ job.refresh()
+ self.assertEqual(job.exc_info, exception_string)
+
+ # Uncompressed exc_info is also handled
+ self.testconn.hset(job.key, 'exc_info', exception_string)
+
+ job.refresh()
+ self.assertEqual(job.exc_info, exception_string)
+
+
def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
@@ -457,7 +483,7 @@ class TestJob(RQTestCase):
"""test if a job created with ttl expires [issue502]"""
queue = Queue(connection=self.testconn)
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
- time.sleep(1)
+ time.sleep(1.1)
self.assertEqual(0, len(queue.get_jobs()))
def test_create_and_cancel_job(self):