summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2019-04-14 18:18:34 +0700
committerGitHub <noreply@github.com>2019-04-14 18:18:34 +0700
commit7021cedaf9614143ddc0d8bbfb0c75f9301c7f47 (patch)
treee4c567fa04cf0c1e711d533936201e2c67c51239
parent07317b62f38b96585f86e8df090938cfc2c2ace3 (diff)
downloadrq-7021cedaf9614143ddc0d8bbfb0c75f9301c7f47.tar.gz
Implemented Job.fetch_many (#1072)
-rw-r--r--docs/docs/jobs.md10
-rw-r--r--docs/docs/workers.md1
-rw-r--r--rq/job.py67
-rw-r--r--rq/utils.py7
-rw-r--r--tests/test_job.py17
5 files changed, 76 insertions, 26 deletions
diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md
index bbea32a..f15689d 100644
--- a/docs/docs/jobs.md
+++ b/docs/docs/jobs.md
@@ -33,7 +33,15 @@ Some interesting job attributes include:
* `job.ended_at`
* `job.exc_info`
-## Accessing the "current" job
+If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.
+
+```python
+jobs = Job.fetch_many(['foo_id', 'bar_id'], connection=redis)
+for job in jobs:
+ print('Job %s: %s' % (job.id, job.func_name))
+```
+
+## Accessing The "current" Job
Since job functions are regular Python functions, you have to ask RQ for the
current job ID, if any. To do this, you can use:
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index 4e3b3e7..739ba6b 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -60,6 +60,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--url` or `-u`: URL describing Redis connection details (e.g `rq worker --url redis://:secrets@example.com:1234/9`)
* `--path` or `-P`: multiple import paths are supported (e.g `rq worker --path foo --path bar`)
* `--config` or `-c`: path to module containing RQ settings.
+* `--results-ttl`: job results will be kept for this number of seconds (defaults to 500).
* `--worker-class` or `-w`: RQ Worker class to use (e.g `rq worker --worker-class 'foo.bar.MyWorker'`)
* `--job-class` or `-j`: RQ Job class to use.
* `--queue-class`: RQ Queue class to use.
diff --git a/rq/job.py b/rq/job.py
index c411fb2..7c97e20 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -13,7 +13,8 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
-from .utils import enum, import_attribute, utcformat, utcnow, utcparse, parse_timeout
+from .utils import (enum, import_attribute, parse_timeout, str_to_date,
+ utcformat, utcnow)
try:
import cPickle as pickle
@@ -287,6 +288,25 @@ class Job(object):
job.refresh()
return job
+ @classmethod
+ def fetch_many(cls, job_ids, connection=None):
+ """Bulk version of Job.fetch"""
+ with connection.pipeline() as pipeline:
+ for job_id in job_ids:
+ pipeline.hgetall(cls.key_for(job_id))
+ results = pipeline.execute()
+
+ jobs = []
+ for i, job_id in enumerate(job_ids):
+ if results[i]:
+ job = cls(job_id, connection=connection)
+ job.restore(results[i])
+ jobs.append(job)
+ else:
+ jobs.append(None)
+
+ return jobs
+
def __init__(self, id=None, connection=None):
self.connection = resolve_connection(connection)
self._id = id
@@ -392,24 +412,9 @@ class Job(object):
"""Backwards-compatibility accessor property `return_value`."""
return_value = result
- # Persistence
- def refresh(self): # noqa
- """Overwrite the current instance's properties with the values in the
- corresponding Redis key.
-
- Will raise a NoSuchJobError if no corresponding Redis key exists.
- """
- key = self.key
- obj = decode_redis_hash(self.connection.hgetall(key))
- if len(obj) == 0:
- raise NoSuchJobError('No such job: {0}'.format(key))
-
- def to_date(date_str):
- if date_str is None:
- return
- else:
- return utcparse(as_text(date_str))
-
+ def restore(self, raw_data):
+ """Overwrite properties with the provided values stored in Redis"""
+ obj = decode_redis_hash(raw_data)
try:
raw_data = obj['data']
except KeyError:
@@ -421,17 +426,17 @@ class Job(object):
# Fallback to uncompressed string
self.data = raw_data
- self.created_at = to_date(as_text(obj.get('created_at')))
+ self.created_at = str_to_date(obj.get('created_at'))
self.origin = as_text(obj.get('origin'))
self.description = as_text(obj.get('description'))
- self.enqueued_at = to_date(as_text(obj.get('enqueued_at')))
- self.started_at = to_date(as_text(obj.get('started_at')))
- self.ended_at = to_date(as_text(obj.get('ended_at')))
+ self.enqueued_at = str_to_date(obj.get('enqueued_at'))
+ self.started_at = str_to_date(obj.get('started_at'))
+ self.ended_at = str_to_date(obj.get('ended_at'))
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
- self.timeout = parse_timeout(as_text(obj.get('timeout'))) if obj.get('timeout') else None
+ self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
- self._status = as_text(obj.get('status') if obj.get('status') else None)
+ self._status = obj.get('status') if obj.get('status') else None
self._dependency_id = as_text(obj.get('dependency_id', None))
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
@@ -444,6 +449,18 @@ class Job(object):
# Fallback to uncompressed string
self.exc_info = as_text(raw_exc_info)
+ # Persistence
+ def refresh(self): # noqa
+ """Overwrite the current instance's properties with the values in the
+ corresponding Redis key.
+
+ Will raise a NoSuchJobError if no corresponding Redis key exists.
+ """
+ data = self.connection.hgetall(self.key)
+ if not data:
+ raise NoSuchJobError('No such job: {0}'.format(self.key))
+ self.restore(data)
+
def to_dict(self, include_meta=True):
"""
Returns a serialization of the current job instance
diff --git a/rq/utils.py b/rq/utils.py
index fb3faf6..b479ec7 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -252,6 +252,13 @@ def backend_class(holder, default_name, override=None):
return override
+def str_to_date(date_str):
+ if date_str is None:
+ return
+ else:
+ return utcparse(as_text(date_str))
+
+
def parse_timeout(timeout):
"""Transfer all kinds of timeout format to an integer representing seconds"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:
diff --git a/tests/test_job.py b/tests/test_job.py
index 4e34574..a990dd7 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -183,6 +183,23 @@ class TestJob(RQTestCase):
self.assertEqual(job.kwargs, dict(z=2))
self.assertEqual(job.created_at, datetime(2012, 2, 7, 22, 13, 24, 123456))
+ def test_fetch_many(self):
+ """Fetching many jobs at once."""
+ data = {
+ 'func': fixtures.some_calculation,
+ 'args': (3, 4),
+ 'kwargs': dict(z=2),
+ 'connection': self.testconn,
+ }
+ job = Job.create(**data)
+ job.save()
+
+ job2 = Job.create(**data)
+ job2.save()
+
+ jobs = Job.fetch_many([job.id, job2.id, 'invalid_id'], self.testconn)
+ self.assertEqual(jobs, [job, job2, None])
+
def test_persistence_of_empty_jobs(self): # noqa
"""Storing empty jobs."""
job = Job()