summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Matecki <thomas.matecki@gmail.com>2019-10-21 21:34:47 -0400
committerSelwin Ong <selwin.ong@gmail.com>2019-10-22 08:34:47 +0700
commit75644ba9486ef3d8b17e8b9cf4dac91fab552375 (patch)
tree1b936c891b04bedbc59bfb786536e15b46d734d1
parentcfc02816ea9be897eac2ddd4afc794ef548b4b4b (diff)
downloadrq-75644ba9486ef3d8b17e8b9cf4dac91fab552375.tar.gz
Multi Dependency Support [Internal API Changes] (#1147)
* Convert `_dependency_id` to `_dependency_ids` Change `Job`s tracking from a single id of it's dependencies from a single _id_ to a list of _id_s. This change should be private to `Job` - especially leaving `Job#to_dict` and `Job#restore`s treatment of a single 'dependency_id' intact. This change modifies existing tests. * Remove reliance upon dependency property in tests ... use dependency.id not `_dependency_id` * Re-add assertions for Falsey Values * Add _dependency_id property For backwards compatibility with other libs such as django-rq and rq-scheduler
-rw-r--r--rq/job.py29
-rw-r--r--tests/test_decorator.py14
-rw-r--r--tests/test_job.py28
3 files changed, 46 insertions, 25 deletions
diff --git a/rq/job.py b/rq/job.py
index 7c97e20..4981f12 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -137,7 +137,7 @@ class Job(object):
# dependency could be job instance or id
if depends_on is not None:
- job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on
+ job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
return job
def get_status(self):
@@ -170,15 +170,23 @@ class Job(object):
return self.get_status() == JobStatus.DEFERRED
@property
+ def _dependency_id(self):
+ """Returns the first item in self._dependency_ids. Present
+ preserve compatibility with third party packages..
+ """
+ if self._dependency_ids:
+ return self._dependency_ids[0]
+
+ @property
def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency.
"""
- if self._dependency_id is None:
+ if not self._dependency_ids:
return None
if hasattr(self, '_dependency'):
return self._dependency
- job = self.fetch(self._dependency_id, connection=self.connection)
+ job = self.fetch(self._dependency_ids[0], connection=self.connection)
self._dependency = job
return job
@@ -328,7 +336,7 @@ class Job(object):
self.failure_ttl = None
self.ttl = None
self._status = None
- self._dependency_id = None
+ self._dependency_ids = []
self.meta = {}
def __repr__(self): # noqa # pragma: no cover
@@ -437,7 +445,10 @@ class Job(object):
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None
- self._dependency_id = as_text(obj.get('dependency_id', None))
+
+ dependency_id = obj.get('dependency_id', None)
+ self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
+
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
@@ -497,8 +508,8 @@ class Job(object):
obj['failure_ttl'] = self.failure_ttl
if self._status is not None:
obj['status'] = self._status
- if self._dependency_id is not None:
- obj['dependency_id'] = self._dependency_id
+ if self._dependency_ids:
+ obj['dependency_id'] = self._dependency_ids[0]
if self.meta and include_meta:
obj['meta'] = dumps(self.meta)
if self.ttl:
@@ -683,7 +694,9 @@ class Job(object):
registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection
- connection.sadd(self.dependents_key_for(self._dependency_id), self.id)
+ for dependency_id in self._dependency_ids:
+ dependents_key = self.dependents_key_for(dependency_id)
+ connection.sadd(dependents_key, self.id)
_job_stack = LocalStack()
diff --git a/tests/test_decorator.py b/tests/test_decorator.py
index 5b7f38e..47c4226 100644
--- a/tests/test_decorator.py
+++ b/tests/test_decorator.py
@@ -4,11 +4,11 @@ from __future__ import (absolute_import, division, print_function,
import mock
from redis import Redis
+
from rq.decorators import job
from rq.job import Job
-from rq.worker import DEFAULT_RESULT_TTL
from rq.queue import Queue
-
+from rq.worker import DEFAULT_RESULT_TTL
from tests import RQTestCase
from tests.fixtures import decorated_job
@@ -109,11 +109,12 @@ class TestDecorator(RQTestCase):
bar_job = bar.delay()
+ self.assertEqual(foo_job._dependency_ids,[])
self.assertIsNone(foo_job._dependency_id)
+ self.assertEqual(foo_job.dependency, None)
self.assertEqual(bar_job.dependency, foo_job)
-
- self.assertEqual(bar_job._dependency_id, foo_job.id)
+ self.assertEqual(bar_job.dependency.id, foo_job.id)
def test_decorator_delay_accepts_depends_on_as_argument(self):
"""Ensure that passing in depends_on to the delay method of
@@ -145,8 +146,11 @@ class TestDecorator(RQTestCase):
self.assertIsNone(foo_job._dependency_id)
self.assertIsNone(bar_job._dependency_id)
- self.assertEqual(baz_job.dependency, bar_job)
+ self.assertEqual(foo_job._dependency_ids,[])
+ self.assertEqual(bar_job._dependency_ids,[])
self.assertEqual(baz_job._dependency_id, bar_job.id)
+ self.assertEqual(baz_job.dependency, bar_job)
+ self.assertEqual(baz_job.dependency.id, bar_job.id)
@mock.patch('rq.queue.resolve_connection')
def test_decorator_connection_laziness(self, resolve_connection):
diff --git a/tests/test_job.py b/tests/test_job.py
index a990dd7..95a2358 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -2,28 +2,28 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
-from datetime import datetime
-
-import time
import sys
+import time
import zlib
-
-is_py2 = sys.version[0] == '2'
-if is_py2:
- import Queue as queue
-else:
- import queue as queue
-
-from tests import fixtures, RQTestCase
+from datetime import datetime
from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError
-from rq.job import Job, get_current_job, JobStatus, cancel_job
+from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry)
from rq.utils import utcformat
from rq.worker import Worker
+from tests import RQTestCase, fixtures
+
+is_py2 = sys.version[0] == '2'
+if is_py2:
+ import Queue as queue
+else:
+ import queue as queue
+
+
try:
from cPickle import loads, dumps
@@ -227,12 +227,16 @@ class TestJob(RQTestCase):
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
+ self.assertEqual(stored_job._dependency_ids, [parent_job.id])
+ self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id)
job.save()
stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id)
+ self.assertEqual(stored_job._dependency_ids, [parent_job.id])
+ self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
def test_store_then_fetch(self):