summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-03-08 07:41:00 +0700
committerSelwin Ong <selwin.ong@gmail.com>2020-03-08 07:41:00 +0700
commiteba89b02555a74ce4b7c0468abdca5dd0de61d93 (patch)
tree558f8528a441cebee5b3e9f962e6531f82befbba
parent8f7dbf1b1dc392704e0be42292c8787286d962ad (diff)
downloadrq-eba89b02555a74ce4b7c0468abdca5dd0de61d93.tar.gz
enqueue_at should support explicit args and kwargsenqueue-at-args
-rw-r--r--rq/queue.py65
-rw-r--r--tests/test_queue.py13
2 files changed, 51 insertions, 27 deletions
diff --git a/rq/queue.py b/rq/queue.py
index 273a142..cff48fb 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -250,7 +250,19 @@ class Queue(object):
description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED):
"""Creates a job based on parameters given."""
- timeout = parse_timeout(timeout) or self._default_timeout
+ timeout = parse_timeout(timeout)
+
+ if timeout is None:
+ timeout = self._default_timeout
+ elif timeout == 0:
+ raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
+
+ result_ttl = parse_timeout(result_ttl)
+ failure_ttl = parse_timeout(failure_ttl)
+
+ ttl = parse_timeout(ttl)
+ if ttl is not None and ttl <= 0:
+ raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
@@ -273,25 +285,11 @@ class Queue(object):
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
"""
- timeout = parse_timeout(timeout)
-
- if timeout is None:
- timeout = self._default_timeout
- elif timeout == 0:
- raise ValueError('0 timeout is not allowed. Use -1 for infinite timeout')
- result_ttl = parse_timeout(result_ttl)
- failure_ttl = parse_timeout(failure_ttl)
-
- ttl = parse_timeout(ttl)
- if ttl is not None and ttl <= 0:
- raise ValueError('Job ttl must be greater than 0')
-
- job = self.job_class.create(
- func, args=args, kwargs=kwargs, connection=self.connection,
- result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
- description=description, depends_on=depends_on, origin=self.name,
- id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
+ job = self.create_job(
+ func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
+ failure_ttl=failure_ttl, description=description, depends_on=depends_on,
+ job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
)
# If a _dependent_ job depends on any unfinished job, register all the
@@ -338,12 +336,10 @@ class Queue(object):
job.cleanup(DEFAULT_RESULT_TTL)
return job
- def enqueue(self, f, *args, **kwargs):
- """Creates a job to represent the delayed function call and enqueues
- it.
-
- Expects the function to call, along with the arguments and keyword
- arguments.
+ @classmethod
+ def parse_args(cls, f, *args, **kwargs):
+ """
+ Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()`
The function argument `f` may be any of the following:
@@ -373,6 +369,15 @@ class Queue(object):
args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)
+ return (f, timeout, description, result_ttl, ttl, failure_ttl,
+ depends_on, job_id, at_front, meta, args, kwargs)
+
+ def enqueue(self, f, *args, **kwargs):
+ """Creates a job to represent the delayed function call and enqueues it."""
+
+ (f, timeout, description, result_ttl, ttl, failure_ttl,
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+
return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
@@ -380,11 +385,17 @@ class Queue(object):
at_front=at_front, meta=meta
)
- def enqueue_at(self, datetime, func, *args, **kwargs):
+ def enqueue_at(self, datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry
- job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs)
+ (f, timeout, description, result_ttl, ttl, failure_ttl,
+ depends_on, job_id, at_front, meta, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
+ job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs,
+ timeout=timeout, result_ttl=result_ttl, ttl=ttl,
+ failure_ttl=failure_ttl, description=description,
+ depends_on=depends_on, job_id=job_id, meta=meta)
+
registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
job.save(pipeline=pipeline)
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 865e14c..ffe08c5 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -327,6 +327,19 @@ class TestQueue(RQTestCase):
((1,), {'timeout': 1, 'result_ttl': 1})
)
+ # Explicit args and kwargs should also work with enqueue_at
+ time = datetime.now(utc) + timedelta(seconds=10)
+ job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
+ self.assertEqual(job.timeout, 2)
+ self.assertEqual(job.result_ttl, 2)
+ self.assertEqual(
+ job.perform(),
+ ((1,), {'timeout': 1, 'result_ttl': 1})
+ )
+
+ # Positional arguments is not allowed if explicit args and kwargs are used
+ self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs)
+
def test_all_queues(self):
"""All queues"""
q1 = Queue('first-queue')