summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJahn Thomas Fidje <jtfidje@gmail.com>2022-09-23 03:06:37 +0200
committerGitHub <noreply@github.com>2022-09-23 08:06:37 +0700
commit840438559261fd64b07d1bce489d352e6d553da5 (patch)
treeb5f45d98551301e299fc65ec518c87d420feba53
parent108c2ea66652737aa54038f7d71477af625b58eb (diff)
downloadrq-840438559261fd64b07d1bce489d352e6d553da5.tar.gz
Add feature to enqueue dependents at the front of queues (#1696)
* Add feature to enqueue dependents at the front of queues * Add documentation for the Dependency(enqueue_at_front=...) parameter * docs: Add `enqueue_at_front` to list of parameters for Dependency * test: Update dependency test to not rely on Redis ordering * refactor: Save enqueue_at_front boolean in job.meta instead of separate instance attr * fix: Made enqueue_at_front an instance attribute instead of putting it inside meta
-rw-r--r--docs/docs/index.md21
-rw-r--r--rq/job.py9
-rw-r--r--rq/queue.py10
-rw-r--r--tests/test_dependencies.py19
4 files changed, 47 insertions, 12 deletions
diff --git a/docs/docs/index.md b/docs/docs/index.md
index 0748846..fb2a68b 100644
--- a/docs/docs/index.md
+++ b/docs/docs/index.md
@@ -167,6 +167,7 @@ The `Dependency(jobs=...)` parameter accepts:
- a string representing a single job id
- a Job object
- an iteratable of job id strings and/or Job objects
+- `enqueue_at_front` boolean parameter to put dependents at the front when they are enqueued
Example:
@@ -177,9 +178,17 @@ from rq import Queue
queue = Queue(connection=Redis())
job_1 = queue.enqueue(div_by_zero)
-dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False
+dependency = Dependency(
+ jobs=[job_1],
+ allow_failure=True, # allow_failure defaults to False
+ enqueue_at_front=True # enqueue_at_front defaults to False
+)
job_2 = queue.enqueue(say_hello, depends_on=dependency)
-# job_2 will execute even though its dependency (job_1) fails
+
+"""
+ job_2 will execute even though its dependency (job_1) fails,
+ and it will be enqueued at the front of the queue.
+"""
```
@@ -269,10 +278,10 @@ There are two options:
#### Arguments:
-| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) |
-|-|-|-|-|
-| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` |
-| no keyword | `[value]` | `:[value]` | `%[value]` |
+| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) |
+| ---------- | --------------- | ---------------- | --------------------------------------------------------------------------- |
+| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` |
+| no keyword | `[value]` | `:[value]` | `%[value]` |
Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding
parsing method.
diff --git a/rq/job.py b/rq/job.py
index 657eb52..0436cf4 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -39,7 +39,7 @@ class JobStatus(str, Enum):
class Dependency:
- def __init__(self, jobs, allow_failure: bool = False):
+ def __init__(self, jobs, allow_failure: bool = False, enqueue_at_front: bool = False):
jobs = ensure_list(jobs)
if not all(
isinstance(job, Job) or isinstance(job, str)
@@ -52,6 +52,7 @@ class Dependency:
self.dependencies = jobs
self.allow_failure = allow_failure
+ self.enqueue_at_front = enqueue_at_front
# Sentinel value to mark that some of our lazily evaluated properties have not
@@ -151,6 +152,7 @@ class Job:
# dependency could be job instance or id, or iterable thereof
if depends_on is not None:
if isinstance(depends_on, Dependency):
+ job.enqueue_at_front = depends_on.enqueue_at_front
job.allow_dependency_failures = depends_on.allow_failure
depends_on_list = depends_on.dependencies
else:
@@ -429,6 +431,7 @@ class Job:
self.redis_server_version = None
self.last_heartbeat = None
self.allow_dependency_failures = None
+ self.enqueue_at_front = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@@ -586,6 +589,7 @@ class Job:
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [])
self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None
+ self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@@ -669,6 +673,9 @@ class Job:
# convert boolean to integer to avoid redis.exception.DataError
obj["allow_dependency_failures"] = int(self.allow_dependency_failures)
+ if self.enqueue_at_front is not None:
+ obj["enqueue_at_front"] = int(self.enqueue_at_front)
+
return obj
def save(self, pipeline=None, include_meta=True):
diff --git a/rq/queue.py b/rq/queue.py
index 9b7dcbd..97b74fb 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -325,9 +325,6 @@ class Queue:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
- if isinstance(depends_on, Dependency):
- job.allow_dependency_failures = depends_on.allow_failure
-
return job
def setup_dependencies(
@@ -648,16 +645,19 @@ class Queue:
break
for dependent in jobs_to_enqueue:
+ enqueue_at_front = dependent.enqueue_at_front or False
+
registry = DeferredJobRegistry(dependent.origin,
self.connection,
job_class=self.job_class,
serializer=self.serializer)
registry.remove(dependent, pipeline=pipe)
+
if dependent.origin == self.name:
- self.enqueue_job(dependent, pipeline=pipe)
+ self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
else:
queue = self.__class__(name=dependent.origin, connection=self.connection)
- queue.enqueue_job(dependent, pipeline=pipe)
+ queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
# Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids):
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py
index 12b956d..d379ed9 100644
--- a/tests/test_dependencies.py
+++ b/tests/test_dependencies.py
@@ -98,6 +98,25 @@ class TestDependencies(RQTestCase):
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
+ # Test dependant is enqueued at front
+ q.empty()
+ parent_job = q.enqueue(say_hello)
+ q.enqueue(
+ say_hello,
+ job_id='fake_job_id_1',
+ depends_on=Dependency(jobs=[parent_job])
+ )
+ q.enqueue(
+ say_hello,
+ job_id='fake_job_id_2',
+ depends_on=Dependency(jobs=[parent_job],enqueue_at_front=True)
+ )
+ #q.enqueue(say_hello) # This is a filler job that will act as a separator for jobs, one will be enqueued at front while the other one at the end of the queue
+ w.work(burst=True, max_jobs=1)
+
+ self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
+
+
def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn)