summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-11-08 21:02:21 +0700
committerGitHub <noreply@github.com>2020-11-08 21:02:21 +0700
commitf3e924cdd160cb99f138782b0c4a67620184e0a2 (patch)
tree4f5348549192b681a34b363653f06283321a9bd5
parent3ead30a34efa31e082dade5ebfa130ac88c48f31 (diff)
downloadrq-f3e924cdd160cb99f138782b0c4a67620184e0a2.tar.gz
Added job.worker_name (#1375)
* Added job.worker_name * Fix compatibility with Redis server 3.x * Document job.worker_name * Removed some Python 2 compatibility stuff. * Remove unused codes
-rw-r--r--docs/docs/jobs.md1
-rw-r--r--rq/compat/__init__.py68
-rw-r--r--rq/job.py26
-rw-r--r--rq/queue.py7
-rw-r--r--rq/registry.py6
-rw-r--r--rq/worker.py37
-rw-r--r--tests/fixtures.py7
-rw-r--r--tests/test_cli.py5
-rw-r--r--tests/test_job.py33
-rw-r--r--tests/test_queue.py9
-rw-r--r--tests/test_scheduler.py67
-rw-r--r--tests/test_worker.py24
12 files changed, 122 insertions, 168 deletions
diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md
index 7ede815..3bc1ab0 100644
--- a/docs/docs/jobs.md
+++ b/docs/docs/jobs.md
@@ -122,6 +122,7 @@ Some interesting job attributes include:
* `job.ended_at`
* `job.exc_info` stores exception information if job doesn't finish successfully.
* `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active.
+* `job.worker_name` returns the worker name currently executing this job.
If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.
diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py
index 0871dec..fee9c3f 100644
--- a/rq/compat/__init__.py
+++ b/rq/compat/__init__.py
@@ -45,62 +45,22 @@ except ImportError:
PY2 = sys.version_info[0] == 2
-if not PY2:
- # Python 3.x and up
- text_type = str
- string_types = (str,)
- def as_text(v):
- if v is None:
- return None
- elif isinstance(v, bytes):
- return v.decode('utf-8')
- elif isinstance(v, str):
- return v
- else:
- raise ValueError('Unknown type %r' % type(v))
+# Python 3.x and up
+text_type = str
+string_types = (str,)
- def decode_redis_hash(h):
- return dict((as_text(k), h[k]) for k in h)
-else:
- # Python 2.x
- def text_type(v):
- try:
- return unicode(v) # noqa
- except Exception:
- return unicode(v, "utf-8", errors="ignore") # noqa
- string_types = (str, unicode) # noqa
+def as_text(v):
+ if v is None:
+ return None
+ elif isinstance(v, bytes):
+ return v.decode('utf-8')
+ elif isinstance(v, str):
+ return v
+ else:
+ raise ValueError('Unknown type %r' % type(v))
- def as_text(v):
- if v is None:
- return None
- elif isinstance(v, str):
- return v.decode('utf-8')
- elif isinstance(v, unicode): # noqa
- return v
- else:
- raise Exception("Input cannot be decoded into literal thing.")
- def decode_redis_hash(h):
- return h
-
-
-try:
- from datetime import timezone
- utc = timezone.utc
-except ImportError:
- # Python 2.x workaround
- from datetime import timedelta, tzinfo
-
- class UTC(tzinfo):
- def utcoffset(self, dt):
- return timedelta(0)
-
- def tzname(self, dt):
- return "UTC"
-
- def dst(self, dt):
- return timedelta(0)
-
- utc = UTC()
+def decode_redis_hash(h):
+ return dict((as_text(k), h[k]) for k in h)
diff --git a/rq/job.py b/rq/job.py
index 868a44b..0f16d3f 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -343,6 +343,7 @@ class Job(object):
self.result_ttl = None
self.failure_ttl = None
self.ttl = None
+ self.worker_name = None
self._status = None
self._dependency_ids = []
self.meta = {}
@@ -479,6 +480,7 @@ class Job(object):
self.created_at = str_to_date(obj.get('created_at'))
self.origin = as_text(obj.get('origin'))
+ self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None
self.description = as_text(obj.get('description'))
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
@@ -500,7 +502,7 @@ class Job(object):
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
-
+
self.retries_left = int(obj.get('retries_left')) if obj.get('retries_left') else None
if obj.get('retry_intervals'):
self.retry_intervals = json.loads(obj.get('retry_intervals').decode())
@@ -538,8 +540,9 @@ class Job(object):
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
+ 'worker_name': self.worker_name or ''
}
-
+
if self.retries_left is not None:
obj['retries_left'] = self.retries_left
if self.retry_intervals is not None:
@@ -554,7 +557,7 @@ class Job(object):
if self._result is not None:
try:
obj['result'] = self.serializer.dumps(self._result)
- except Exception as e:
+ except: # noqa
obj['result'] = "Unserializable return value"
if self.exc_info is not None:
obj['exc_info'] = zlib.compress(str(self.exc_info).encode('utf-8'))
@@ -694,6 +697,23 @@ class Job(object):
assert self is _job_stack.pop()
return self._result
+ def prepare_for_execution(self, worker_name, pipeline):
+ """Set job metadata before execution begins"""
+ self.worker_name = worker_name
+ self.last_heartbeat = utcnow()
+ self.started_at = self.last_heartbeat
+ self._status = JobStatus.STARTED
+ mapping = {
+ 'last_heartbeat': utcformat(self.last_heartbeat),
+ 'status': self._status,
+ 'started_at': utcformat(self.started_at),
+ 'worker_name': worker_name
+ }
+ if self.get_redis_server_version() >= StrictVersion("4.0.0"):
+ pipeline.hset(self.key, mapping=mapping)
+ else:
+ pipeline.hmset(self.key, mapping)
+
def _execute(self):
return self.func(*self.args, **self.kwargs)
diff --git a/rq/queue.py b/rq/queue.py
index e470dfe..5e98bcd 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -4,12 +4,13 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
-from datetime import datetime
+
+from datetime import datetime, timezone
from distutils.version import StrictVersion
from redis import WatchError
-from .compat import as_text, string_types, total_ordering, utc
+from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
@@ -451,7 +452,7 @@ nd
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedules a job to be executed in a given `timedelta` object"""
- return self.enqueue_at(datetime.now(utc) + time_delta,
+ return self.enqueue_at(datetime.now(timezone.utc) + time_delta,
func, *args, **kwargs)
def enqueue_job(self, job, pipeline=None, at_front=False):
diff --git a/rq/registry.py b/rq/registry.py
index 5a5c078..76cf96e 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -1,8 +1,8 @@
import calendar
import time
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
-from .compat import as_text, utc
+from .compat import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
@@ -303,7 +303,7 @@ class ScheduledJobRegistry(BaseRegistry):
if not score:
raise NoSuchJobError
- return datetime.fromtimestamp(score, tz=utc)
+ return datetime.fromtimestamp(score, tz=timezone.utc)
def clean_registries(queue):
diff --git a/rq/worker.py b/rq/worker.py
index 3b4aade..cbe7073 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -26,7 +26,7 @@ from redis import WatchError
from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE
-from .compat import PY2, as_text, string_types, text_type
+from .compat import as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL,
@@ -859,9 +859,7 @@ class Worker(object):
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
- job.set_status(JobStatus.STARTED, pipeline=pipeline)
- job.heartbeat(utcnow(), pipeline=pipeline)
- pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
+ job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
msg = 'Processing {0} from {1} since {2}'
@@ -883,7 +881,7 @@ class Worker(object):
self.connection,
job_class=self.job_class
)
-
+ job.worker_name = None
# Requeue/reschedule if retry is configured
if job.retries_left and job.retries_left > 0:
retry = True
@@ -943,6 +941,7 @@ class Worker(object):
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
+ job.worker_name = None
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
@@ -985,9 +984,7 @@ class Worker(object):
except: # NOQA
job.ended_at = utcnow()
exc_info = sys.exc_info()
- exc_string = self._get_safe_exception_string(
- traceback.format_exception(*exc_info)
- )
+ exc_string = ''.join(traceback.format_exception(*exc_info))
self.handle_job_failure(job=job, exc_string=exc_string, queue=queue,
started_job_registry=started_job_registry)
self.handle_exception(job, *exc_info)
@@ -1014,9 +1011,7 @@ class Worker(object):
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
- exc_string = Worker._get_safe_exception_string(
- traceback.format_exception(*exc_info)
- )
+ exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.error(exc_string, exc_info=True, extra={
'func': job.func_name,
'arguments': job.args,
@@ -1037,16 +1032,6 @@ class Worker(object):
if not fallthrough:
break
- @staticmethod
- def _get_safe_exception_string(exc_strings):
- """Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
- if sys.version_info[0] < 3:
- try:
- exc_strings = [exc.decode("utf-8") for exc in exc_strings]
- except ValueError:
- exc_strings = [exc.decode("latin-1") for exc in exc_strings]
- return ''.join(exc_strings)
-
def push_exc_handler(self, handler_func):
"""Pushes an exception handler onto the exc handler stack."""
self._exc_handlers.append(handler_func)
@@ -1101,16 +1086,14 @@ class Worker(object):
class SimpleWorker(Worker):
- def main_work_horse(self, *args, **kwargs):
- raise NotImplementedError("Test worker does not implement this method")
def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()"""
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1:
- timeout = DEFAULT_WORKER_TTL
+ timeout = DEFAULT_WORKER_TTL
else:
- timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60
+ timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60
return self.perform_job(job, queue, heartbeat_ttl=timeout)
@@ -1124,10 +1107,6 @@ class HerokuWorker(Worker):
imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
- if PY2:
- frame_properties.extend(
- ['f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_restricted']
- )
def setup_work_horse_signals(self):
"""Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN"""
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 5d8e8bc..e50ecb0 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -14,7 +14,7 @@ import subprocess
from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job
-from rq.compat import PY2, text_type
+from rq.compat import text_type
from rq.worker import HerokuWorker
@@ -120,10 +120,7 @@ class CallableObject(object):
class UnicodeStringObject(object):
def __repr__(self):
- if PY2:
- return u'é'.encode('utf-8')
- else:
- return u'é'
+ return u'é'
with Connection():
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 1dea1e3..51d4e60 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -2,13 +2,12 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
-from datetime import datetime
+from datetime import datetime, timezone
from click.testing import CliRunner
from redis import Redis
from rq import Queue
-from rq.compat import utc
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
@@ -250,7 +249,7 @@ class TestRQCli(RQTestCase):
def test_worker_with_scheduler(self):
"""rq worker -u <url> --with-scheduler"""
queue = Queue(connection=self.connection)
- queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
registry = ScheduledJobRegistry(queue=queue)
runner = CliRunner()
diff --git a/tests/test_job.py b/tests/test_job.py
index 2987c5b..f990afd 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -10,7 +10,7 @@ from datetime import datetime, timedelta
from redis import WatchError
-from rq.compat import PY2, as_text
+from rq.compat import as_text
from rq.exceptions import NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job, Retry
from rq.queue import Queue
@@ -32,18 +32,9 @@ class TestJob(RQTestCase):
args=[12, "☃"],
kwargs=dict(snowman="☃", null=None),
)
-
- if not PY2:
- # Python 3
- expected_string = "myfunc(12, '☃', null=None, snowman='☃')"
- else:
- # Python 2
- expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode(
- 'utf-8')
-
self.assertEqual(
job.description,
- expected_string,
+ "myfunc(12, '☃', null=None, snowman='☃')",
)
def test_create_empty_job(self):
@@ -222,7 +213,7 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
- [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at'])
+ [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', b'worker_name'])
self.assertEqual(job.last_heartbeat, None)
self.assertEqual(job.last_heartbeat, None)
@@ -439,10 +430,20 @@ class TestJob(RQTestCase):
job = Job.create(func=fixtures.say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
- if PY2:
- self.assertEqual(job.description, "tests.fixtures.say_hello(u'Lionel')")
- else:
- self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
+ self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
+
+ def test_prepare_for_execution(self):
+ """job.prepare_for_execution works properly"""
+ job = Job.create(func=fixtures.say_hello)
+ job.save()
+ with self.testconn.pipeline() as pipeline:
+ job.prepare_for_execution("worker_name", pipeline)
+ pipeline.execute()
+ job.refresh()
+ self.assertEqual(job.worker_name, "worker_name")
+ self.assertEqual(job.get_status(), JobStatus.STARTED)
+ self.assertIsNotNone(job.last_heartbeat)
+ self.assertIsNotNone(job.started_at)
def test_job_access_outside_job_fails(self):
"""The current job is accessible only within a job context."""
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 6425c87..9b61b5a 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -3,13 +3,10 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import json
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from mock.mock import patch
from rq import Retry, Queue
-from rq.compat import utc
-from rq.exceptions import NoSuchJobError
-
from rq.job import Job, JobStatus
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, ScheduledJobRegistry,
@@ -364,7 +361,7 @@ class TestQueue(RQTestCase):
)
# Explicit args and kwargs should also work with enqueue_at
- time = datetime.now(utc) + timedelta(seconds=10)
+ time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
@@ -671,7 +668,7 @@ class TestJobScheduling(RQTestCase):
def test_enqueue_at(self):
"""enqueue_at() creates a job in ScheduledJobRegistry"""
queue = Queue(connection=self.testconn)
- scheduled_time = datetime.now(utc) + timedelta(seconds=10)
+ scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = queue.enqueue_at(scheduled_time, say_hello)
registry = ScheduledJobRegistry(queue=queue)
self.assertIn(job, registry)
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index b1de0e7..348236c 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1,11 +1,11 @@
import os
import time
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from multiprocessing import Process
from rq import Queue
-from rq.compat import utc, PY2
+from rq.compat import PY2
from rq.exceptions import NoSuchJobError
from rq.job import Job, Retry
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
@@ -57,8 +57,8 @@ class TestScheduledJobRegistry(RQTestCase):
job = Job.create('myfunc', connection=self.testconn)
job.save()
- dt = datetime(2019, 1, 1, tzinfo=utc)
- registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ dt = datetime(2019, 1, 1, tzinfo=timezone.utc)
+ registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc))
self.assertEqual(registry.get_scheduled_time(job), dt)
# get_scheduled_time() should also work with job ID
self.assertEqual(registry.get_scheduled_time(job.id), dt)
@@ -74,35 +74,28 @@ class TestScheduledJobRegistry(RQTestCase):
job.save()
registry = ScheduledJobRegistry(queue=queue)
- if PY2:
- # On Python 2, datetime needs to have timezone
- self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1))
- registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ from datetime import timezone
+ # If we pass in a datetime with no timezone, `schedule()`
+ # assumes local timezone so depending on your local timezone,
+ # the timestamp maybe different
+
+ # we need to account for the difference between a timezone
+ # with DST active and without DST active. The time.timezone
+ # property isn't accurate when time.daylight is non-zero,
+ # we'll test both.
+
+ # first, time.daylight == 0 (not in DST).
+ # mock the sitatuoin for American/New_York not in DST (UTC - 5)
+ # time.timezone = 18000
+ # time.daylight = 0
+ # time.altzone = 14400
+ mock_day = mock.patch('time.daylight', 0)
+ mock_tz = mock.patch('time.timezone', 18000)
+ mock_atz = mock.patch('time.altzone', 14400)
+ with mock_tz, mock_day, mock_atz:
+ registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
- 1546300800) # 2019-01-01 UTC in Unix timestamp
- else:
- from datetime import timezone
- # If we pass in a datetime with no timezone, `schedule()`
- # assumes local timezone so depending on your local timezone,
- # the timestamp maybe different
-
- # we need to account for the difference between a timezone
- # with DST active and without DST active. The time.timezone
- # property isn't accurate when time.daylight is non-zero,
- # we'll test both.
-
- # first, time.daylight == 0 (not in DST).
- # mock the sitatuoin for American/New_York not in DST (UTC - 5)
- # time.timezone = 18000
- # time.daylight = 0
- # time.altzone = 14400
- mock_day = mock.patch('time.daylight', 0)
- mock_tz = mock.patch('time.timezone', 18000)
- mock_atz = mock.patch('time.altzone', 14400)
- with mock_tz, mock_day, mock_atz:
- registry.schedule(job, datetime(2019, 1, 1))
- self.assertEqual(self.testconn.zscore(registry.key, job.id),
- 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
+ 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
# second, time.daylight != 0 (in DST)
# mock the sitatuoin for American/New_York not in DST (UTC - 4)
@@ -227,7 +220,7 @@ class TestScheduler(RQTestCase):
registry = ScheduledJobRegistry(queue=queue)
job = Job.create('myfunc', connection=self.testconn)
job.save()
- registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ registry.schedule(job, datetime(2019, 1, 1, tzinfo=timezone.utc))
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
scheduler.enqueue_scheduled_jobs()
@@ -237,7 +230,7 @@ class TestScheduler(RQTestCase):
self.assertEqual(len(registry), 0)
# Jobs scheduled in the far future should not be affected
- registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc))
+ registry.schedule(job, datetime(2100, 1, 1, tzinfo=timezone.utc))
scheduler.enqueue_scheduled_jobs()
self.assertEqual(len(queue), 1)
@@ -294,7 +287,7 @@ class TestWorker(RQTestCase):
p = Process(target=kill_worker, args=(os.getpid(), False, 5))
p.start()
- queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
worker.work(burst=False, with_scheduler=True)
p.join(1)
self.assertIsNotNone(worker.scheduler)
@@ -311,7 +304,7 @@ class TestQueue(RQTestCase):
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
- job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=timezone.utc), say_hello)
self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 1)
@@ -330,7 +323,7 @@ class TestQueue(RQTestCase):
registry = ScheduledJobRegistry(queue=queue)
job = queue.enqueue_in(timedelta(seconds=30), say_hello)
- now = datetime.now(utc)
+ now = datetime.now(timezone.utc)
scheduled_time = registry.get_scheduled_time(job)
# Ensure that job is scheduled roughly 30 seconds from now
self.assertTrue(
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 1ab5865..bf11b92 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -181,6 +181,7 @@ class TestWorker(RQTestCase):
'Expected at least some work done.'
)
self.assertEqual(job.result, 'Hi there, Frank!')
+ self.assertIsNone(job.worker_name)
def test_job_times(self):
"""job times are set correctly."""
@@ -296,7 +297,7 @@ class TestWorker(RQTestCase):
enqueued_at_date = str(job.enqueued_at)
w = Worker([q])
- w.work(burst=True) # should silently pass
+ w.work(burst=True)
# Postconditions
self.assertEqual(q.count, 0)
@@ -307,6 +308,7 @@ class TestWorker(RQTestCase):
# Check the job
job = Job.fetch(job.id)
self.assertEqual(job.origin, q.name)
+ self.assertIsNone(job.worker_name) # Worker name is cleared after failures
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
@@ -373,7 +375,7 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.failed_job_count, 2)
self.assertEqual(worker.successful_job_count, 2)
self.assertEqual(worker.total_working_time, 3.0)
-
+
def test_handle_retry(self):
"""handle_job_failure() handles retry properly"""
connection = self.testconn
@@ -409,7 +411,7 @@ class TestWorker(RQTestCase):
self.assertEqual([], queue.job_ids)
# If a job is no longer retries, it's put in FailedJobRegistry
self.assertTrue(job in registry)
-
+
def test_retry_interval(self):
"""Retries with intervals are scheduled"""
connection = self.testconn
@@ -586,26 +588,26 @@ class TestWorker(RQTestCase):
q = Queue()
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q])
- self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertNotEqual(self.testconn.ttl(job.key), 0)
- self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q])
- self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), -1)
- self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
w = Worker([q])
- self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn.get(job.key), None)
- self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
+ self.assertNotIn(job.get_id().encode(), self.testconn.lrange(q.key, 0, -1))
def test_worker_sets_job_status(self):
"""Ensure that worker correctly sets job status."""
@@ -736,6 +738,10 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.get_state(), 'busy')
self.assertEqual(worker.get_current_job_id(), job.id)
+ # job status is also updated
+ self.assertEqual(job._status, JobStatus.STARTED)
+ self.assertEqual(job.worker_name, worker.name)
+
def test_prepare_job_execution_inf_timeout(self):
"""Prepare job execution handles infinite job timeout"""
queue = Queue(connection=self.testconn)