summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2017-11-18 08:13:36 +0700
committerSelwin Ong <selwin.ong@gmail.com>2017-11-18 08:13:36 +0700
commite25c5dbc16333c2c976539bad48a583cd007bfb7 (patch)
tree70fe8cf97cd3f77219af61c7eac603e683f6d458
parent8cca220cce8739bf1c2b1852830874e35bf1ebaf (diff)
downloadrq-e25c5dbc16333c2c976539bad48a583cd007bfb7.tar.gz
job.data is now stored in compressed format.worker-compression
-rw-r--r--rq/job.py10
-rw-r--r--tests/test_job.py31
-rw-r--r--tests/test_worker.py20
3 files changed, 45 insertions, 16 deletions
diff --git a/rq/job.py b/rq/job.py
index 6125762..eb83250 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -417,10 +417,16 @@ class Job(object):
return utcparse(as_text(date_str))
try:
- self.data = obj['data']
+ raw_data = obj['data']
except KeyError:
raise NoSuchJobError('Unexpected job format: {0}'.format(obj))
+ try:
+ self.data = zlib.decompress(raw_data)
+ except zlib.error:
+ # Fallback to uncompressed string
+ self.data = raw_data
+
self.created_at = to_date(as_text(obj.get('created_at')))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
@@ -453,7 +459,7 @@ class Job(object):
"""
obj = {}
obj['created_at'] = utcformat(self.created_at or utcnow())
- obj['data'] = self.data
+ obj['data'] = zlib.compress(self.data)
if self.origin is not None:
obj['origin'] = self.origin
diff --git a/tests/test_job.py b/tests/test_job.py
index d10d55e..b3512a4 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -161,7 +161,7 @@ class TestJob(RQTestCase):
self.assertEqual(self.testconn.type(job.key), b'hash')
# Saving writes pickled job data
- unpickled_data = loads(self.testconn.hget(job.key, 'data'))
+ unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data')))
self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation')
def test_fetch(self):
@@ -237,7 +237,8 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self):
"""Fetching succeeds on unreadable data, but lazy props fail."""
# Set up
- job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2))
+ job = Job.create(func=fixtures.some_calculation, args=(3, 4),
+ kwargs=dict(z=2))
job.save()
# Just replace the data hkey with some random noise
@@ -256,9 +257,10 @@ class TestJob(RQTestCase):
# Now slightly modify the job to make it unimportable (this is
# equivalent to a worker not having the most up-to-date source code
# and unable to import the function)
- data = self.testconn.hget(job.key, 'data')
- unimportable_data = data.replace(b'say_hello', b'nay_hello')
- self.testconn.hset(job.key, 'data', unimportable_data)
+ job_data = job.data
+ unimportable_data = job_data.replace(b'say_hello', b'nay_hello')
+
+ self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data))
job.refresh()
with self.assertRaises(AttributeError):
@@ -268,7 +270,7 @@ class TestJob(RQTestCase):
"""Jobs handle both compressed and uncompressed exc_info"""
exception_string = 'Some exception'
- job = Job.create(func=fixtures.say_hello, args=('Lionel',))
+ job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.exc_info = exception_string
job.save()
@@ -288,6 +290,23 @@ class TestJob(RQTestCase):
job.refresh()
self.assertEqual(job.exc_info, exception_string)
+ def test_compressed_job_data_handling(self):
+ """Jobs handle both compressed and uncompressed data"""
+
+ job = Job.create(func=fixtures.say_hello, args=('Lionel',))
+ job.save()
+
+ # Job data is stored in compressed format
+ job_data = job.data
+ self.assertEqual(
+ zlib.compress(job_data),
+ self.testconn.hget(job.key, 'data')
+ )
+
+ self.testconn.hset(job.key, 'data', job_data)
+ job.refresh()
+ self.assertEqual(job.data, job_data)
+
def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly."""
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 4563e30..8e44d29 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -4,13 +4,16 @@ from __future__ import (absolute_import, division, print_function,
import os
import shutil
-from datetime import datetime, timedelta
-from time import sleep
import signal
-import time
-from multiprocessing import Process
import subprocess
import sys
+import time
+import zlib
+
+from datetime import datetime, timedelta
+from multiprocessing import Process
+from time import sleep
+
from unittest import skipIf
import pytest
@@ -180,10 +183,11 @@ class TestWorker(RQTestCase):
# importable from the worker process.
job = Job.create(func=div_by_zero, args=(3,))
job.save()
- data = self.testconn.hget(job.key, 'data')
- invalid_data = data.replace(b'div_by_zero', b'nonexisting')
- assert data != invalid_data
- self.testconn.hset(job.key, 'data', invalid_data)
+
+ job_data = job.data
+ invalid_data = job_data.replace(b'div_by_zero', b'nonexisting')
+ assert job_data != invalid_data
+ self.testconn.hset(job.key, 'data', zlib.compress(invalid_data))
# We use the low-level internal function to enqueue any data (bypassing
# validity checks)