diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2017-11-18 08:13:36 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2017-11-18 08:13:36 +0700 |
commit | e25c5dbc16333c2c976539bad48a583cd007bfb7 (patch) | |
tree | 70fe8cf97cd3f77219af61c7eac603e683f6d458 | |
parent | 8cca220cce8739bf1c2b1852830874e35bf1ebaf (diff) | |
download | rq-worker-compression.tar.gz |
job.data is now stored in compressed format.worker-compression
-rw-r--r-- | rq/job.py | 10 | ||||
-rw-r--r-- | tests/test_job.py | 31 | ||||
-rw-r--r-- | tests/test_worker.py | 20 |
3 files changed, 45 insertions, 16 deletions
@@ -417,10 +417,16 @@ class Job(object): return utcparse(as_text(date_str)) try: - self.data = obj['data'] + raw_data = obj['data'] except KeyError: raise NoSuchJobError('Unexpected job format: {0}'.format(obj)) + try: + self.data = zlib.decompress(raw_data) + except zlib.error: + # Fallback to uncompressed string + self.data = raw_data + self.created_at = to_date(as_text(obj.get('created_at'))) self.origin = as_text(obj.get('origin')) self.description = as_text(obj.get('description')) @@ -453,7 +459,7 @@ class Job(object): """ obj = {} obj['created_at'] = utcformat(self.created_at or utcnow()) - obj['data'] = self.data + obj['data'] = zlib.compress(self.data) if self.origin is not None: obj['origin'] = self.origin diff --git a/tests/test_job.py b/tests/test_job.py index d10d55e..b3512a4 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -161,7 +161,7 @@ class TestJob(RQTestCase): self.assertEqual(self.testconn.type(job.key), b'hash') # Saving writes pickled job data - unpickled_data = loads(self.testconn.hget(job.key, 'data')) + unpickled_data = loads(zlib.decompress(self.testconn.hget(job.key, 'data'))) self.assertEqual(unpickled_data[0], 'tests.fixtures.some_calculation') def test_fetch(self): @@ -237,7 +237,8 @@ class TestJob(RQTestCase): def test_fetching_unreadable_data(self): """Fetching succeeds on unreadable data, but lazy props fail.""" # Set up - job = Job.create(func=fixtures.some_calculation, args=(3, 4), kwargs=dict(z=2)) + job = Job.create(func=fixtures.some_calculation, args=(3, 4), + kwargs=dict(z=2)) job.save() # Just replace the data hkey with some random noise @@ -256,9 +257,10 @@ class TestJob(RQTestCase): # Now slightly modify the job to make it unimportable (this is # equivalent to a worker not having the most up-to-date source code # and unable to import the function) - data = self.testconn.hget(job.key, 'data') - unimportable_data = data.replace(b'say_hello', b'nay_hello') - self.testconn.hset(job.key, 'data', unimportable_data) + job_data = job.data + unimportable_data = job_data.replace(b'say_hello', b'nay_hello') + + self.testconn.hset(job.key, 'data', zlib.compress(unimportable_data)) job.refresh() with self.assertRaises(AttributeError): @@ -268,7 +270,7 @@ class TestJob(RQTestCase): """Jobs handle both compressed and uncompressed exc_info""" exception_string = 'Some exception' - job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job.exc_info = exception_string job.save() @@ -288,6 +290,23 @@ class TestJob(RQTestCase): job.refresh() self.assertEqual(job.exc_info, exception_string) + def test_compressed_job_data_handling(self): + """Jobs handle both compressed and uncompressed data""" + + job = Job.create(func=fixtures.say_hello, args=('Lionel',)) + job.save() + + # Job data is stored in compressed format + job_data = job.data + self.assertEqual( + zlib.compress(job_data), + self.testconn.hget(job.key, 'data') + ) + + self.testconn.hset(job.key, 'data', job_data) + job.refresh() + self.assertEqual(job.data, job_data) + def test_custom_meta_is_persisted(self): """Additional meta data on jobs are stored persisted correctly.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 4563e30..8e44d29 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,13 +4,16 @@ from __future__ import (absolute_import, division, print_function, import os import shutil -from datetime import datetime, timedelta -from time import sleep import signal -import time -from multiprocessing import Process import subprocess import sys +import time +import zlib + +from datetime import datetime, timedelta +from multiprocessing import Process +from time import sleep + from unittest import skipIf import pytest @@ -180,10 +183,11 @@ class TestWorker(RQTestCase): # importable from the worker process. job = Job.create(func=div_by_zero, args=(3,)) job.save() - data = self.testconn.hget(job.key, 'data') - invalid_data = data.replace(b'div_by_zero', b'nonexisting') - assert data != invalid_data - self.testconn.hset(job.key, 'data', invalid_data) + + job_data = job.data + invalid_data = job_data.replace(b'div_by_zero', b'nonexisting') + assert job_data != invalid_data + self.testconn.hset(job.key, 'data', zlib.compress(invalid_data)) # We use the low-level internal function to enqueue any data (bypassing # validity checks) |