diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2017-11-18 07:13:10 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2017-11-18 07:13:10 +0700 |
commit | 8cca220cce8739bf1c2b1852830874e35bf1ebaf (patch) | |
tree | a32721c917865af39660455706e9d616757e2d2b | |
parent | ff36e0656e3484ec648c682214b48aad3f924e88 (diff) | |
download | rq-8cca220cce8739bf1c2b1852830874e35bf1ebaf.tar.gz |
job.exc_info is now compressed.
-rw-r--r-- | rq/job.py | 13 | ||||
-rw-r--r-- | tests/test_job.py | 32 |
2 files changed, 40 insertions, 5 deletions
@@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import inspect import warnings +import zlib from functools import partial from uuid import uuid4 @@ -427,7 +428,6 @@ class Job(object): self.started_at = to_date(as_text(obj.get('started_at'))) self.ended_at = to_date(as_text(obj.get('ended_at'))) self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa - self.exc_info = as_text(obj.get('exc_info')) self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self._status = as_text(obj.get('status') if obj.get('status') else None) @@ -435,6 +435,15 @@ class Job(object): self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} + raw_exc_info = obj.get('exc_info') + if raw_exc_info: + try: + self.exc_info = as_text(zlib.decompress(raw_exc_info)) + except zlib.error: + # Fallback to uncompressed string + self.exc_info = as_text(raw_exc_info) + + def to_dict(self, include_meta=True): """ Returns a serialization of the current job instance @@ -462,7 +471,7 @@ class Job(object): except: obj['result'] = 'Unpickleable return value' if self.exc_info is not None: - obj['exc_info'] = self.exc_info + obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8')) if self.timeout is not None: obj['timeout'] = self.timeout if self.result_ttl is not None: diff --git a/tests/test_job.py b/tests/test_job.py index fe9afcd..d10d55e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -3,9 +3,10 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from datetime import datetime -import time +import time import sys +import zlib is_py2 = sys.version[0] == '2' if is_py2: @@ -15,7 +16,7 @@ else: from tests import fixtures, RQTestCase -from rq.compat import PY2 +from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import Job, get_current_job, JobStatus, cancel_job, requeue_job from rq.queue import Queue, get_failed_queue @@ -263,6 +264,31 @@ class TestJob(RQTestCase): with self.assertRaises(AttributeError): job.func # accessing the func property should fail + def test_compressed_exc_info_handling(self): + """Jobs handle both compressed and uncompressed exc_info""" + exception_string = 'Some exception' + + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job.exc_info = exception_string + job.save() + + # exc_info is stored in compressed format + exc_info = self.testconn.hget(job.key, 'exc_info') + self.assertEqual( + as_text(zlib.decompress(exc_info)), + exception_string + ) + + job.refresh() + self.assertEqual(job.exc_info, exception_string) + + # Uncompressed exc_info is also handled + self.testconn.hset(job.key, 'exc_info', exception_string) + + job.refresh() + self.assertEqual(job.exc_info, exception_string) + + def test_custom_meta_is_persisted(self): """Additional meta data on jobs are stored persisted correctly.""" job = Job.create(func=fixtures.say_hello, args=('Lionel',)) @@ -457,7 +483,7 @@ class TestJob(RQTestCase): """test if a job created with ttl expires [issue502]""" queue = Queue(connection=self.testconn) queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1) - time.sleep(1) + time.sleep(1.1) self.assertEqual(0, len(queue.get_jobs())) def test_create_and_cancel_job(self): |