summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kiryanov <22193285+coolhacker170597@users.noreply.github.com>2020-01-23 03:11:42 +0300
committerSelwin Ong <selwin.ong@gmail.com>2020-01-23 07:11:42 +0700
commited67de22c6e1d40c5f851113ad60b74eaf4f40b4 (patch)
treef941af9348e8e01a3379019b42b77b36dd7d92f4
parentccfd4a02cbc9130c845b64ca6b5b7510c99fadde (diff)
downloadrq-ed67de22c6e1d40c5f851113ad60b74eaf4f40b4.tar.gz
Add job status setting in enqueue_at (and in enqueue_in) methods (#1181)
* Add job status setting in enqueue_at (and in enqueue_in) methods Update tests for this change Closes: #1179 * Add status param to create_job func, rework enqueue_at status setting
-rw-r--r--rq/job.py11
-rw-r--r--rq/queue.py6
-rw-r--r--tests/test_job.py13
-rw-r--r--tests/test_scheduler.py5
4 files changed, 30 insertions, 5 deletions
diff --git a/rq/job.py b/rq/job.py
index 3a5c7c3..da2b674 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -173,6 +173,10 @@ class Job(object):
return self.get_status() == JobStatus.DEFERRED
@property
+ def is_scheduled(self):
+ return self.get_status() == JobStatus.SCHEDULED
+
+ @property
def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present
preserve compatibility with third party packages..
@@ -618,6 +622,13 @@ class Job(object):
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
+ elif self.is_scheduled:
+ from .registry import ScheduledJobRegistry
+ registry = ScheduledJobRegistry(self.origin,
+ connection=self.connection,
+ job_class=self.__class__)
+ registry.remove(self, pipeline=pipeline)
+
elif self.is_failed:
self.failed_job_registry.remove(self, pipeline=pipeline)
diff --git a/rq/queue.py b/rq/queue.py
index 89bbea3..273a142 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -248,14 +248,14 @@ class Queue(object):
def create_job(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
- meta=None):
+ meta=None, status=JobStatus.QUEUED):
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout) or self._default_timeout
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
- status=JobStatus.QUEUED, description=description,
+ status=status, description=description,
depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta
)
@@ -384,7 +384,7 @@ class Queue(object):
"""Schedules a job to be enqueued at specified time"""
from .registry import ScheduledJobRegistry
- job = self.create_job(func, *args, **kwargs)
+ job = self.create_job(func, status=JobStatus.SCHEDULED, *args, **kwargs)
registry = ScheduledJobRegistry(queue=self)
with self.connection.pipeline() as pipeline:
job.save(pipeline=pipeline)
diff --git a/tests/test_job.py b/tests/test_job.py
index 8f168d2..aa0bdad 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -14,7 +14,8 @@ from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
- FinishedJobRegistry, StartedJobRegistry)
+ FinishedJobRegistry, StartedJobRegistry,
+ ScheduledJobRegistry)
from rq.utils import utcformat
from rq.worker import Worker
from tests import RQTestCase, fixtures
@@ -579,6 +580,16 @@ class TestJob(RQTestCase):
job.delete()
self.assertFalse(job in registry)
+ job = Job.create(func=fixtures.say_hello, status=JobStatus.SCHEDULED,
+ connection=self.testconn, origin='default')
+ job.save()
+
+ registry = ScheduledJobRegistry(connection=self.testconn)
+ registry.add(job, 500)
+
+ job.delete()
+ self.assertFalse(job in registry)
+
def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis."""
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 459fec8..88c7fa5 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -267,10 +267,13 @@ class TestQueue(RQTestCase):
scheduler = RQScheduler([queue], connection=self.testconn)
scheduler.acquire_locks()
# Jobs created using enqueue_at is put in the ScheduledJobRegistry
- queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ job = queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
self.assertEqual(len(queue), 0)
self.assertEqual(len(registry), 1)
+ # enqueue_at set job status to "scheduled"
+ self.assertTrue(job.get_status() == 'scheduled')
+
# After enqueue_scheduled_jobs() is called, the registry is empty
# and job is enqueued
scheduler.enqueue_scheduled_jobs()