summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-01-04 10:14:52 +0700
committerGitHub <noreply@github.com>2020-01-04 10:14:52 +0700
commitbaa0cc268adff31f22e0278e2c76cf26c5a2d14c (patch)
treeea2f95f9894e153c50464f898decf12fd834dc4e
parentf09d4db080728db6dc301043f26037d82b74501f (diff)
downloadrq-baa0cc268adff31f22e0278e2c76cf26c5a2d14c.tar.gz
Job scheduling (#1163)
* First RQScheduler prototype * WIP job scheduling * Fixed Python 2.7 tests * Added ScheduledJobRegistry.get_scheduled_time(job) * WIP on scheduler's threading mechanism * Fixed test errors * Changed scheduler.acquire_locks() to instance method * Added scheduler.prepare_registries() * Somewhat working implementation of RQ scheduler * Only call stop_scheduler if there's a scheduler present * Use OSError rather than ProcessLookupError for PyPy compatibility * Added `auto_start` argument to scheduler.acquire_locks() * Make RQScheduler play better with timezone * Fixed test error * Added --with-scheduler flag to rq worker CLI * Fix tests on Python 2.x * More Python 2 fixes * Only call `scheduler.start` if worker is run in non burst mode * Fixed an issue where running worker with scheduler would fail sometimes * Make `worker.stop_scheduler()` more resilient to errors * worker.dequeue_job_and_maintain_ttl() should also periodically run maintenance tasks * Scheduler can now work with worker in both burst and non burst mode * Fixed scheduler logging message * Always log scheduler errors when running * Improve scheduler error logging message * Removed testing code * Scheduler should periodically try to acquire locks for other queues it doesn't have * Added tests for scheduler.should_reacquire_locks * Added queue.enqueue_in() * Fixes queue.enqueue_in() in Python 2.7 * First stab at documenting job scheduling * Remove unused methods * Remove Python 2.6 logging compatibility code * Remove more unused imports * Added convenience methods to access job registries from queue * Added test for worker.run_maintenance_tasks() * Simplify worker.queue_names() and worker.queue_keys() * Updated changelog to mention RQ's new job scheduling mechanism.
-rw-r--r--CHANGES.md3
-rw-r--r--dev-requirements.txt2
-rw-r--r--docs/docs/job_registries.md20
-rw-r--r--docs/docs/scheduling.md114
-rwxr-xr-xrq/cli/cli.py9
-rw-r--r--rq/compat/__init__.py20
-rw-r--r--rq/job.py9
-rw-r--r--rq/queue.py76
-rw-r--r--rq/registry.py78
-rw-r--r--rq/scheduler.py197
-rw-r--r--rq/utils.py5
-rw-r--r--rq/worker.py75
-rw-r--r--tests/fixtures.py11
-rw-r--r--tests/test_cli.py22
-rw-r--r--tests/test_queue.py33
-rw-r--r--tests/test_registry.py14
-rw-r--r--tests/test_scheduler.py291
-rw-r--r--tests/test_worker.py21
18 files changed, 943 insertions, 57 deletions
diff --git a/CHANGES.md b/CHANGES.md
index b0fe541..c103cbf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,4 +1,5 @@
-### RQ 1.2.0 (Unreleased)
+### RQ 1.2.0 (2020-01-04)
+* This release also contains an alpha version of RQ's builtin job scheduling mechanism. Thanks @selwin!
* Various internal API changes in preparation to support multiple job dependencies. Thanks @thomasmatecki!
* `--verbose` or `--quiet` CLI arguments should override `--logging-level`. Thanks @zyt312074545!
* Fixes a bug in `rq info` where it doesn't show workers for empty queues. Thanks @zyt312074545!
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 93253de..615fac9 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,2 +1,2 @@
mock
-pytest
+pytest \ No newline at end of file
diff --git a/docs/docs/job_registries.md b/docs/docs/job_registries.md
index cbd739c..6e29c57 100644
--- a/docs/docs/job_registries.md
+++ b/docs/docs/job_registries.md
@@ -10,6 +10,7 @@ executed and removed right after completion (success or failure).
* `FailedJobRegistry` Holds jobs that have been executed, but didn't finish successfully.
* `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that
job to finish).
+* `ScheduledJobRegistry` Holds schedduled jobs.
You can get the number of jobs in a registry, the ids of the jobs in the registry, and more.
Below is an example using a `StartedJobRegistry`.
@@ -44,6 +45,24 @@ print('Job in registry %s' % (job in registry))
print('Job in registry %s' % (job.id in registry))
```
+_New in version 1.2.0_
+
+You can quickly access job registries from `Queue` objects.
+
+```python
+from redis import Redis
+from rq import Queue
+
+redis = Redis()
+queue = Queue(connection=redis)
+
+queue.started_job_registry # Returns StartedJobRegistry
+queue.deferred_job_registry # Returns DeferredJobRegistry
+queue.finished_job_registry # Returns FinishedJobRegistry
+queue.failed_job_registry # Returns FailedJobRegistry
+queue.scheduled_job_registry # Returns ScheduledobRegistry
+```
+
## Removing Jobs
_New in version 1.2.0_
@@ -69,5 +88,4 @@ for job_id in registry.get_job_ids():
# use `delete_job=True`
for job_id in registry.get_job_ids():
registry.remove(job_id, delete_job=True)
-
``` \ No newline at end of file
diff --git a/docs/docs/scheduling.md b/docs/docs/scheduling.md
new file mode 100644
index 0000000..11fb7f1
--- /dev/null
+++ b/docs/docs/scheduling.md
@@ -0,0 +1,114 @@
+---
+title: "RQ: Scheduling Jobs"
+layout: docs
+---
+
+_New in version 1.2.0._
+
+This builtin version of `RQScheduler` is still in alpha, use at your own risk!
+
+If you need a battle tested version of RQ job scheduling, please take a look at
+https://github.com/rq/rq-scheduler instead.
+
+New in RQ 1.2.0 is `RQScheduler`, a built-in component that allows you to schedule jobs
+for future execution.
+
+This component is developed based on prior experience of developing the external
+`rq-scheduler` library. The goal of taking this component in house is to allow
+RQ to have job scheduling capabilities without:
+1. Running a separate `rqscheduler` CLI command.
+2. Worrying about a separate `Scheduler` class.
+
+
+# Scheduling Jobs for Execution
+
+There are two main APIs to schedule jobs for execution, `enqueue_at()` and `enqueue_in()`.
+
+`queue.enqueue_at()` works almost like `queue.enqueue()`, except that it expects a datetime
+for its first argument.
+
+```python
+from datetime import datetime
+from rq import Queue
+from redis import Redis
+from somewhere import say_hello
+
+queue = Queue(name='default', connection=Redis())
+
+# Schedules job to be run at 9:15, October 10th in the local timezone
+job = queue.enqueue_at(datetime(2019, 10, 8, 9, 15), say_hello)
+```
+
+Note that if you pass in a naive datetime object, RQ will automatically convert it
+to the local timezone.
+
+`queue.enqueue_in()` accepts a `timedelta` as its first argument.
+
+```python
+from datetime import timedelta
+from rq import Queue
+from redis import Redis
+from somewhere import say_hello
+
+queue = Queue(name='default', connection=Redis())
+
+# Schedules job to be run in 10 seconds
+job = queue.enqueue_at(timedelta(seconds=10), say_hello)
+```
+
+Jobs that are scheduled for execution are not placed in the queue, but they are
+stored in `ScheduledJobRegistry`.
+
+```python
+from datetime import timedelta
+from redis import Redis
+
+from rq import Queue
+from rq.registry import ScheduledJobRegistry
+
+redis = Redis()
+
+queue = Queue(name='default', connection=redis)
+job = queue.enqueue_in(timedelta(seconds=10), say_nothing)
+print(job in queue) # Outputs False as job is not enqueued
+
+registry = ScheduledJobRegistry(queue=queue)
+print(job in registry) # Outputs True as job is placed in ScheduledJobRegistry
+```
+
+# Running the Scheduler
+
+If you use RQ's scheduling features, you need to run RQ workers with the
+scheduler component enabled.
+
+```console
+$ rq worker --with-scheduler
+```
+
+You can also run a worker with scheduler enabled in a programmatic way.
+
+```python
+from rq import Worker, Queue
+from redis import Redis
+
+redis = Redis()
+
+queue = Queue(connection=redis)
+worker = Worker(queues=[queue], connection=redis)
+worker.work(with_scheduler=True)
+```
+
+Only a single scheduler can run for a specific queue at any one time. If you run multiple
+workers with scheduler enabled, only one scheduler will be actively working for a given queue.
+
+Active schedulers are responsible for enqueueing scheduled jobs. Active schedulers will check for
+scheduled jobs once every second.
+
+Idle schedulers will periodically (every 15 minutes) check whether the queues they're
+responsible for have active schedulers. If they don't, one of the idle schedulers will start
+working. This way, if a worker with active scheduler dies, the scheduling work will be picked
+up by other workers with the scheduling component enabled.
+
+
+
+
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index fb2ab0b..a5dad27 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -200,12 +200,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
+@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, disable_job_desc_logging, verbose, quiet, sentry_dsn,
- exception_handler, pid, disable_default_exception_handler, max_jobs, queues,
- log_format, date_format, **options):
+ exception_handler, pid, disable_default_exception_handler, max_jobs, with_scheduler,
+ queues, log_format, date_format, **options):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
@@ -253,7 +254,9 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
if verbose or quiet:
logging_level = None
- worker.work(burst=burst, logging_level=logging_level, date_format=date_format, log_format=log_format, max_jobs=max_jobs)
+ worker.work(burst=burst, logging_level=logging_level,
+ date_format=date_format, log_format=log_format,
+ max_jobs=max_jobs, with_scheduler=with_scheduler)
except ConnectionError as e:
print(e)
sys.exit(1)
diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py
index 81f4aff..3f8b3aa 100644
--- a/rq/compat/__init__.py
+++ b/rq/compat/__init__.py
@@ -84,3 +84,23 @@ else:
def decode_redis_hash(h):
return h
+
+
+try:
+ from datetime import timezone
+ utc = timezone.utc
+except ImportError:
+ # Python 2.x workaround
+ from datetime import timedelta, tzinfo
+
+ class UTC(tzinfo):
+ def utcoffset(self, dt):
+ return timedelta(0)
+
+ def tzname(self, dt):
+ return "UTC"
+
+ def dst(self, dt):
+ return timedelta(0)
+
+ utc = UTC() \ No newline at end of file
diff --git a/rq/job.py b/rq/job.py
index daf08a7..3a5c7c3 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -33,7 +33,8 @@ JobStatus = enum(
FINISHED='finished',
FAILED='failed',
STARTED='started',
- DEFERRED='deferred'
+ DEFERRED='deferred',
+ SCHEDULED='scheduled',
)
# Sentinel value to mark that some of our lazily evaluated properties have not
@@ -128,9 +129,9 @@ class Job(object):
# Extra meta data
job.description = description or job.get_call_string()
- job.result_ttl = result_ttl
- job.failure_ttl = failure_ttl
- job.ttl = ttl
+ job.result_ttl = parse_timeout(result_ttl)
+ job.failure_ttl = parse_timeout(failure_ttl)
+ job.ttl = parse_timeout(ttl)
job.timeout = parse_timeout(timeout)
job._status = status
job.meta = meta or {}
diff --git a/rq/queue.py b/rq/queue.py
index 135a99a..89bbea3 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -5,13 +5,14 @@ from __future__ import (absolute_import, division, print_function,
import uuid
import warnings
+from datetime import datetime
+
from redis import WatchError
-from .compat import as_text, string_types, total_ordering
+from .compat import as_text, string_types, total_ordering, utc
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
-from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError,
- UnpickleError)
+from .exceptions import DequeueTimeout, NoSuchJobError, UnpickleError
from .job import Job, JobStatus
from .utils import backend_class, import_attribute, parse_timeout, utcnow
@@ -184,7 +185,31 @@ class Queue(object):
def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry
- return FailedJobRegistry(queue=self)
+ return FailedJobRegistry(queue=self, job_class=self.job_class)
+
+ @property
+ def started_job_registry(self):
+ """Returns this queue's FailedJobRegistry."""
+ from rq.registry import StartedJobRegistry
+ return StartedJobRegistry(queue=self, job_class=self.job_class)
+
+ @property
+ def finished_job_registry(self):
+ """Returns this queue's FailedJobRegistry."""
+ from rq.registry import FinishedJobRegistry
+ return FinishedJobRegistry(queue=self)
+
+ @property
+ def deferred_job_registry(self):
+ """Returns this queue's FailedJobRegistry."""
+ from rq.registry import DeferredJobRegistry
+ return DeferredJobRegistry(queue=self, job_class=self.job_class)
+
+ @property
+ def scheduled_job_registry(self):
+ """Returns this queue's FailedJobRegistry."""
+ from rq.registry import ScheduledJobRegistry
+ return ScheduledJobRegistry(queue=self, job_class=self.job_class)
def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID."""
@@ -220,6 +245,23 @@ class Queue(object):
else:
connection.rpush(self.key, job_id)
+ def create_job(self, func, args=None, kwargs=None, timeout=None,
+ result_ttl=None, ttl=None, failure_ttl=None,
+ description=None, depends_on=None, job_id=None,
+ meta=None):
+ """Creates a job based on parameters given."""
+ timeout = parse_timeout(timeout) or self._default_timeout
+
+ job = self.job_class.create(
+ func, args=args, kwargs=kwargs, connection=self.connection,
+ result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
+ status=JobStatus.QUEUED, description=description,
+ depends_on=depends_on, timeout=timeout, id=job_id,
+ origin=self.name, meta=meta
+ )
+
+ return job
+
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
@@ -248,12 +290,12 @@ class Queue(object):
job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl,
- status=JobStatus.QUEUED, description=description,
- depends_on=depends_on, timeout=timeout, id=job_id,
- origin=self.name, meta=meta)
+ description=description, depends_on=depends_on, origin=self.name,
+ id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout,
+ )
# If a _dependent_ job depends on any unfinished job, register all the
- #_dependent_ job's dependencies instead of enqueueing it.
+ # _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
@@ -338,6 +380,24 @@ class Queue(object):
at_front=at_front, meta=meta
)
+ def enqueue_at(self, datetime, func, *args, **kwargs):
+ """Schedules a job to be enqueued at specified time"""
+ from .registry import ScheduledJobRegistry
+
+ job = self.create_job(func, *args, **kwargs)
+ registry = ScheduledJobRegistry(queue=self)
+ with self.connection.pipeline() as pipeline:
+ job.save(pipeline=pipeline)
+ registry.schedule(job, datetime, pipeline=pipeline)
+ pipeline.execute()
+
+ return job
+
+ def enqueue_in(self, time_delta, func, *args, **kwargs):
+ """Schedules a job to be executed in a given `timedelta` object"""
+ return self.enqueue_at(datetime.now(utc) + time_delta,
+ func, *args, **kwargs)
+
def enqueue_job(self, job, pipeline=None, at_front=False):
"""Enqueues a job for delayed execution.
diff --git a/rq/registry.py b/rq/registry.py
index 104de06..9232ddf 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -1,4 +1,8 @@
-from .compat import as_text
+import calendar
+import time
+from datetime import datetime, timedelta
+
+from .compat import as_text, utc
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
@@ -32,6 +36,9 @@ class BaseRegistry(object):
"""Returns the number of jobs in this registry"""
return self.count
+ def __eq__(self, other):
+ return (self.name == other.name and self.connection == other.connection)
+
def __contains__(self, item):
"""
Returns a boolean indicating registry contains the given
@@ -92,6 +99,11 @@ class BaseRegistry(object):
"""Returns Queue object associated with this registry."""
return Queue(self.name, connection=self.connection)
+ def get_expiration_time(self, job):
+ """Returns job's expiration time."""
+ score = self.connection.zscore(self.key, job.id)
+ return datetime.utcfromtimestamp(score)
+
class StartedJobRegistry(BaseRegistry):
"""
@@ -221,6 +233,70 @@ class DeferredJobRegistry(BaseRegistry):
pass
+class ScheduledJobRegistry(BaseRegistry):
+ """
+ Registry of scheduled jobs.
+ """
+ key_template = 'rq:scheduled:{0}'
+
+ def __init__(self, *args, **kwargs):
+ super(ScheduledJobRegistry, self).__init__(*args, **kwargs)
+ # The underlying implementation of get_jobs_to_enqueue() is
+ # the same as get_expired_job_ids, but get_expired_job_ids() doesn't
+ # make sense in this context
+ self.get_jobs_to_enqueue = self.get_expired_job_ids
+
+ def schedule(self, job, scheduled_datetime, pipeline=None):
+ """
+ Adds job to registry, scored by its execution time (in UTC).
+ If datetime has no tzinfo, it will assume localtimezone.
+ """
+ # If datetime has no timezone, assume server's local timezone
+ # if we're on Python 3. If we're on Python 2.7, raise an
+ # exception since Python < 3.2 has no builtin `timezone` class
+ if not scheduled_datetime.tzinfo:
+ try:
+ from datetime import timezone
+ except ImportError:
+ raise ValueError('datetime object with no timezone')
+ tz = timezone(timedelta(seconds=-time.timezone))
+ scheduled_datetime = scheduled_datetime.replace(tzinfo=tz)
+
+ timestamp = calendar.timegm(scheduled_datetime.utctimetuple())
+ return self.connection.zadd(self.key, {job.id: timestamp})
+
+ def cleanup(self):
+ """This method is only here to prevent errors because this method is
+ automatically called by `count()` and `get_job_ids()` methods
+ implemented in BaseRegistry."""
+ pass
+
+ def remove_jobs(self, timestamp=None, pipeline=None):
+ """Remove jobs whose timestamp is in the past from registry."""
+ connection = pipeline if pipeline is not None else self.connection
+ score = timestamp if timestamp is not None else current_timestamp()
+ return connection.zremrangebyscore(self.key, 0, score)
+
+ def get_jobs_to_schedule(self, timestamp=None):
+ """Remove jobs whose timestamp is in the past from registry."""
+ score = timestamp if timestamp is not None else current_timestamp()
+ return [as_text(job_id) for job_id in
+ self.connection.zrangebyscore(self.key, 0, score)]
+
+ def get_scheduled_time(self, job_or_id):
+ """Returns datetime (UTC) at which job is scheduled to be enqueued"""
+ if isinstance(job_or_id, self.job_class):
+ job_id = job_or_id.id
+ else:
+ job_id = job_or_id
+
+ score = self.connection.zscore(self.key, job_id)
+ if not score:
+ raise NoSuchJobError
+
+ return datetime.fromtimestamp(score, tz=utc)
+
+
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name,
diff --git a/rq/scheduler.py b/rq/scheduler.py
new file mode 100644
index 0000000..d4ebec5
--- /dev/null
+++ b/rq/scheduler.py
@@ -0,0 +1,197 @@
+import logging
+import os
+import signal
+import time
+import traceback
+
+from datetime import datetime
+from multiprocessing import Process
+
+from .job import Job
+from .queue import Queue
+from .registry import ScheduledJobRegistry
+from .utils import current_timestamp, enum
+
+
+SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
+SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
+
+format = "%(asctime)s: %(message)s"
+logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
+
+
+class RQScheduler(object):
+
+ # STARTED: scheduler has been started but sleeping
+ # WORKING: scheduler is in the midst of scheduling jobs
+ # STOPPED: scheduler is in stopped condition
+
+ Status = enum(
+ 'SchedulerStatus',
+ STARTED='started',
+ WORKING='working',
+ STOPPED='stopped'
+ )
+
+ def __init__(self, queues, connection, interval=1):
+ self._queue_names = set(parse_names(queues))
+ self._acquired_locks = set([])
+ self._scheduled_job_registries = []
+ self.lock_acquisition_time = None
+ self.connection = connection
+ self.interval = interval
+ self._stop_requested = False
+ self._status = self.Status.STOPPED
+ self._process = None
+
+ @property
+ def acquired_locks(self):
+ return self._acquired_locks
+
+ @property
+ def status(self):
+ return self._status
+
+ @property
+ def should_reacquire_locks(self):
+ """Returns True if lock_acquisition_time is longer than 15 minutes ago"""
+ if self._queue_names == self.acquired_locks:
+ return False
+ if not self.lock_acquisition_time:
+ return True
+ return (datetime.now() - self.lock_acquisition_time).total_seconds() > 900
+
+ def acquire_locks(self, auto_start=False):
+ """Returns names of queue it successfully acquires lock on"""
+ successful_locks = set([])
+ pid = os.getpid()
+ logging.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
+ for name in self._queue_names:
+ if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=5):
+ successful_locks.add(name)
+ self._acquired_locks = self._acquired_locks.union(successful_locks)
+ if self._acquired_locks:
+ self.prepare_registries(self._acquired_locks)
+
+ self.lock_acquisition_time = datetime.now()
+
+ # If auto_start is requested and scheduler is not started,
+ # run self.start()
+ if self._acquired_locks and auto_start:
+ if not self._process:
+ self.start()
+
+ return successful_locks
+
+ def prepare_registries(self, queue_names):
+ """Prepare scheduled job registries for use"""
+ self._scheduled_job_registries = []
+ for name in queue_names:
+ self._scheduled_job_registries.append(
+ ScheduledJobRegistry(name, connection=self.connection)
+ )
+
+ @classmethod
+ def get_locking_key(self, name):
+ """Returns scheduler key for a given queue name"""
+ return SCHEDULER_LOCKING_KEY_TEMPLATE % name
+
+ def enqueue_scheduled_jobs(self):
+ """Enqueue jobs whose timestamp is in the past"""
+ self._status = self.Status.WORKING
+ for registry in self._scheduled_job_registries:
+ timestamp = current_timestamp()
+
+ # TODO: try to use Lua script to make get_jobs_to_schedule()
+ # and remove_jobs() atomic
+ job_ids = registry.get_jobs_to_schedule(timestamp)
+
+ if not job_ids:
+ continue
+
+ queue = Queue(registry.name, connection=self.connection)
+
+ with self.connection.pipeline() as pipeline:
+ # This should be done in bulk
+ for job_id in job_ids:
+ job = Job.fetch(job_id, connection=self.connection)
+ queue.enqueue_job(job, pipeline=pipeline)
+ registry.remove_jobs(timestamp)
+ pipeline.execute()
+ self._status = self.Status.STARTED
+
+ def _install_signal_handlers(self):
+ """Installs signal handlers for handling SIGINT and SIGTERM
+ gracefully.
+ """
+ signal.signal(signal.SIGINT, self.request_stop)
+ signal.signal(signal.SIGTERM, self.request_stop)
+
+ def request_stop(self, signum=None, frame=None):
+ """Toggle self._stop_requested that's checked on every loop"""
+ self._stop_requested = True
+
+ def heartbeat(self):
+ """Updates the TTL on scheduler keys and the locks"""
+ logging.info("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
+ if len(self._queue_names) > 1:
+ with self.connection.pipeline() as pipeline:
+ for name in self._queue_names:
+ key = self.get_locking_key(name)
+ pipeline.expire(key, self.interval + 5)
+ pipeline.execute()
+ else:
+ key = self.get_locking_key(next(iter(self._queue_names)))
+ self.connection.expire(key, self.interval + 5)
+
+ def stop(self):
+ logging.info("Scheduler stopping, releasing locks for %s...",
+ ','.join(self._queue_names))
+ keys = [self.get_locking_key(name) for name in self._queue_names]
+ self.connection.delete(*keys)
+ self._status = self.Status.STOPPED
+
+ def start(self):
+ self._status = self.Status.STARTED
+ self._process = Process(target=run, args=(self,), name='Scheduler')
+ self._process.start()
+ return self._process
+
+ def work(self):
+ self._install_signal_handlers()
+ while True:
+ if self._stop_requested:
+ self.stop()
+ break
+
+ if self.should_reacquire_locks:
+ self.acquire_locks()
+
+ self.enqueue_scheduled_jobs()
+ self.heartbeat()
+ time.sleep(self.interval)
+
+
+def run(scheduler):
+ logging.info("Scheduler for %s started with PID %s",
+ ','.join(scheduler._queue_names), os.getpid())
+ try:
+ scheduler.work()
+ except: # noqa
+ logging.error(
+ 'Scheduler [PID %s] raised an exception.\n%s',
+ os.getpid(), traceback.format_exc()
+ )
+ raise
+ logging.info("Scheduler with PID %s has stopped", os.getpid())
+
+
+def parse_names(queues_or_names):
+ """Given a list of strings or queues, returns queue names"""
+ names = []
+ for queue_or_name in queues_or_names:
+ if isinstance(queue_or_name, Queue):
+ names.append(queue_or_name.name)
+ else:
+ names.append(str(queue_or_name))
+ return names
diff --git a/rq/utils.py b/rq/utils.py
index b479ec7..c399b55 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -125,10 +125,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
def __init__(self, exclude=None, *args, **kwargs):
self.exclude = exclude
- if is_python_version((2, 6)):
- logging.StreamHandler.__init__(self, *args, **kwargs)
- else:
- super(ColorizingStreamHandler, self).__init__(*args, **kwargs)
+ super(ColorizingStreamHandler, self).__init__(*args, **kwargs)
@property
def is_tty(self):
diff --git a/rq/worker.py b/rq/worker.py
index 6960d34..25b98e8 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -33,8 +33,8 @@ from .exceptions import DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import Queue
-from .registry import (FailedJobRegistry, FinishedJobRegistry,
- StartedJobRegistry, clean_registries)
+from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
+from .scheduler import RQScheduler
from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, enum,
@@ -204,6 +204,7 @@ class Worker(object):
self.failed_job_count = 0
self.total_working_time = 0
self.birth_date = None
+ self.scheduler = None
self.disable_default_exception_handler = disable_default_exception_handler
@@ -221,11 +222,11 @@ class Worker(object):
def queue_names(self):
"""Returns the queue names of this worker's queues."""
- return list(map(lambda q: q.name, self.queues))
+ return [queue.name for queue in self.queues]
def queue_keys(self):
"""Returns the Redis keys representing this worker's queues."""
- return list(map(lambda q: q.key, self.queues))
+ return [queue.key for queue in self.queues]
@property
def key(self):
@@ -411,7 +412,11 @@ class Worker(object):
self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. '
'Press Ctrl+C again for a cold shutdown.')
+ if self.scheduler:
+ self.stop_scheduler()
else:
+ if self.scheduler:
+ self.stop_scheduler()
raise StopRequested()
def handle_warm_shutdown_request(self):
@@ -440,8 +445,22 @@ class Worker(object):
if before_state:
self.set_state(before_state)
+ def run_maintenance_tasks(self):
+ """
+ Runs periodic maintenance tasks, these include:
+ 1. Check if scheduler should be started. This check should not be run
+ on first run since worker.work() already calls
+ `scheduler.enqueue_scheduled_jobs()` on startup.
+ 2. Cleaning registries
+ """
+ # No need to try to start scheduler on first run
+ if self.last_cleaned_at:
+ if self.scheduler and not self.scheduler._process:
+ self.scheduler.acquire_locks(auto_start=True)
+ self.clean_registries()
+
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
- log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None):
+ log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@@ -450,8 +469,7 @@ class Worker(object):
The return value indicates whether any jobs were processed.
"""
- setup_loghandlers(logging_level, date_format, log_format)
- self._install_signal_handlers()
+ setup_loghandlers(logging_level, date_format, log_format)
completed_jobs = 0
self.register_birth()
self.log.info("Worker %s: started, version %s", self.key, VERSION)
@@ -459,13 +477,27 @@ class Worker(object):
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
+ if with_scheduler:
+ self.scheduler = RQScheduler(self.queues, connection=self.connection)
+ self.scheduler.acquire_locks()
+ # If lock is acquired, start scheduler
+ if self.scheduler.acquired_locks:
+ # If worker is run on burst mode, enqueue_scheduled_jobs()
+ # before working. Otherwise, start scheduler in a separate process
+ if burst:
+ self.scheduler.enqueue_scheduled_jobs()
+ else:
+ self.scheduler.start()
+
+ self._install_signal_handlers()
+
try:
while True:
try:
self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
- self.clean_registries()
+ self.run_maintenance_tasks()
if self._stop_requested:
self.log.info('Worker %s: stopping on request', self.key)
@@ -507,9 +539,23 @@ class Worker(object):
break
finally:
if not self.is_horse:
+
+ if self.scheduler:
+ self.stop_scheduler()
+
self.register_death()
return bool(completed_jobs)
+ def stop_scheduler(self):
+ """Ensure scheduler process is stopped"""
+ if self.scheduler._process and self.scheduler._process.pid:
+ # Send the kill signal to scheduler process
+ try:
+ os.kill(self.scheduler._process.pid, signal.SIGTERM)
+ except OSError:
+ pass
+ self.scheduler._process.join()
+
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = ','.join(self.queue_names())
@@ -521,6 +567,9 @@ class Worker(object):
while True:
self.heartbeat()
+ if self.should_run_maintenance_tasks:
+ self.run_maintenance_tasks()
+
try:
result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection,
@@ -798,9 +847,7 @@ class Worker(object):
# Don't clobber the user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False)
- finished_job_registry = FinishedJobRegistry(job.origin,
- self.connection,
- job_class=self.job_class)
+ finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
@@ -819,9 +866,7 @@ class Worker(object):
self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection)
- started_job_registry = StartedJobRegistry(job.origin,
- self.connection,
- job_class=self.job_class)
+ started_job_registry = queue.started_job_registry
try:
job.started_at = utcnow()
@@ -837,7 +882,7 @@ class Worker(object):
self.handle_job_success(job=job,
queue=queue,
started_job_registry=started_job_registry)
- except:
+ except: # NOQA
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = self._get_safe_exception_string(
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 882bdad..46cdaac 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -8,6 +8,7 @@ from __future__ import (absolute_import, division, print_function,
import os
import time
+import signal
import sys
from rq import Connection, get_current_job, get_current_connection, Queue
@@ -164,3 +165,13 @@ def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay):
class DummyQueue(object):
pass
+
+
+def kill_worker(pid, double_kill, interval=0.5):
+ # wait for the worker to be started over on the main process
+ time.sleep(interval)
+ os.kill(pid, signal.SIGTERM)
+ if double_kill:
+ # give the worker time to switch signal handler
+ time.sleep(interval)
+ os.kill(pid, signal.SIGTERM)
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 343e286..1dea1e3 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -2,14 +2,17 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
+from datetime import datetime
+
from click.testing import CliRunner
from redis import Redis
from rq import Queue
+from rq.compat import utc
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
-from rq.registry import FailedJobRegistry
+from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.worker import Worker, WorkerStatus
import pytest
@@ -244,6 +247,21 @@ class TestRQCli(RQTestCase):
self.assertTrue(len(pid.read()) > 0)
self.assert_normal_execution(result)
+ def test_worker_with_scheduler(self):
+ """rq worker -u <url> --with-scheduler"""
+ queue = Queue(connection=self.connection)
+ queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ registry = ScheduledJobRegistry(queue=queue)
+
+ runner = CliRunner()
+ result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
+ self.assert_normal_execution(result)
+ self.assertEqual(len(registry), 1) # 1 job still scheduled
+
+ result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--with-scheduler'])
+ self.assert_normal_execution(result)
+ self.assertEqual(len(registry), 0) # Job has been enqueued
+
def test_worker_logging_options(self):
"""--quiet and --verbose logging options are supported"""
runner = CliRunner()
@@ -256,7 +274,7 @@ class TestRQCli(RQTestCase):
# --quiet and --verbose are mutually exclusive
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)
-
+
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
connection = Redis.from_url(self.redis_url)
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 22b6b11..865e14c 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -2,11 +2,18 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
+from datetime import datetime, timedelta
+
from rq import Queue
-from rq.exceptions import InvalidJobDependency, NoSuchJobError
+from rq.compat import utc
+from rq.exceptions import NoSuchJobError
+
from rq.job import Job, JobStatus
-from rq.registry import DeferredJobRegistry
+from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
+ FinishedJobRegistry, ScheduledJobRegistry,
+ StartedJobRegistry)
from rq.worker import Worker
+
from tests import RQTestCase
from tests.fixtures import echo, say_hello
@@ -236,7 +243,7 @@ class TestQueue(RQTestCase):
None
)
self.assertEqual(q.count, 0)
-
+
def test_enqueue_with_ttl(self):
"""Negative TTL value is not allowed"""
queue = Queue()
@@ -520,3 +527,23 @@ class TestQueue(RQTestCase):
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
+
+ def test_getting_registries(self):
+ """Getting job registries from queue object"""
+ queue = Queue('example')
+ self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
+ self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
+ self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
+ self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
+ self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
+
+
+class TestJobScheduling(RQTestCase):
+ def test_enqueue_at(self):
+ """enqueue_at() creates a job in ScheduledJobRegistry"""
+ queue = Queue(connection=self.testconn)
+ scheduled_time = datetime.now(utc) + timedelta(seconds=10)
+ job = queue.enqueue_at(scheduled_time, say_hello)
+ registry = ScheduledJobRegistry(queue=queue)
+ self.assertIn(job, registry)
+ self.assertTrue(registry.get_expiration_time(job), scheduled_time)
diff --git a/tests/test_registry.py b/tests/test_registry.py
index d0d7b09..b7921d1 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
+from datetime import datetime, timedelta
+
from rq.compat import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import InvalidJobOperation
@@ -57,6 +59,18 @@ class TestRegistry(RQTestCase):
self.assertTrue(job in registry)
self.assertTrue(job.id in registry)
+ def test_get_expiration_time(self):
+ """registry.get_expiration_time() returns correct datetime objects"""
+ registry = StartedJobRegistry(connection=self.testconn)
+ queue = Queue(connection=self.testconn)
+ job = queue.enqueue(say_hello)
+
+ registry.add(job, 5)
+ self.assertEqual(
+ registry.get_expiration_time(job),
+ (datetime.utcnow() + timedelta(seconds=5)).replace(microsecond=0)
+ )
+
def test_add_and_remove(self):
"""Adding and removing job to StartedJobRegistry."""
timestamp = current_timestamp()
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
new file mode 100644
index 0000000..459fec8
--- /dev/null
+++ b/tests/test_scheduler.py
@@ -0,0 +1,291 @@
+import os
+import time
+
+from datetime import datetime, timedelta
+from multiprocessing import Process
+
+from rq import Queue
+from rq.compat import utc, PY2
+from rq.exceptions import NoSuchJobError
+from rq.job import Job
+from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
+from rq.scheduler import RQScheduler
+from rq.utils import current_timestamp
+from rq.worker import Worker
+
+from .fixtures import kill_worker, say_hello
+from tests import RQTestCase
+
+import mock
+
+
+class TestScheduledJobRegistry(RQTestCase):
+
+ def test_get_jobs_to_enqueue(self):
+ """Getting job ids to enqueue from ScheduledJobRegistry."""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ timestamp = current_timestamp()
+
+ self.testconn.zadd(registry.key, {'foo': 1})
+ self.testconn.zadd(registry.key, {'bar': timestamp + 10})
+ self.testconn.zadd(registry.key, {'baz': timestamp + 30})
+
+ self.assertEqual(registry.get_jobs_to_enqueue(), ['foo'])
+ self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20),
+ ['foo', 'bar'])
+
+ def test_get_scheduled_time(self):
+ """get_scheduled_time() returns job's scheduled datetime"""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+
+ job = Job.create('myfunc', connection=self.testconn)
+ job.save()
+ dt = datetime(2019, 1, 1, tzinfo=utc)
+ registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ self.assertEqual(registry.get_scheduled_time(job), dt)
+ # get_scheduled_time() should also work with job ID
+ self.assertEqual(registry.get_scheduled_time(job.id), dt)
+
+ # registry.get_scheduled_time() raises NoSuchJobError if
+ # job.id is not found
+ self.assertRaises(NoSuchJobError, registry.get_scheduled_time, '123')
+
+ def test_schedule(self):
+ """Adding job with the correct score to ScheduledJobRegistry"""
+ queue = Queue(connection=self.testconn)
+ job = Job.create('myfunc', connection=self.testconn)
+ job.save()
+ registry = ScheduledJobRegistry(queue=queue)
+
+ if PY2:
+ # On Python 2, datetime needs to have timezone
+ self.assertRaises(ValueError, registry.schedule, job, datetime(2019, 1, 1))
+ registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ self.assertEqual(self.testconn.zscore(registry.key, job.id),
+ 1546300800) # 2019-01-01 UTC in Unix timestamp
+ else:
+ from datetime import timezone
+ # If we pass in a datetime with no timezone, `schedule()`
+ # assumes local timezone so depending on your local timezone,
+ # the timestamp maybe different
+ registry.schedule(job, datetime(2019, 1, 1))
+ self.assertEqual(self.testconn.zscore(registry.key, job.id),
+ 1546300800 + time.timezone) # 2019-01-01 UTC in Unix timestamp
+
+ # Score is always stored in UTC even if datetime is in a different tz
+ tz = timezone(timedelta(hours=7))
+ job = Job.create('myfunc', connection=self.testconn)
+ job.save()
+ registry.schedule(job, datetime(2019, 1, 1, 7, tzinfo=tz))
+ self.assertEqual(self.testconn.zscore(registry.key, job.id),
+ 1546300800) # 2019-01-01 UTC in Unix timestamp
+
+
+class TestScheduler(RQTestCase):
+
+ def test_init(self):
+ """Scheduler can be instantiated with queues or queue names"""
+ foo_queue = Queue('foo', connection=self.testconn)
+ scheduler = RQScheduler([foo_queue, 'bar'], connection=self.testconn)
+ self.assertEqual(scheduler._queue_names, {'foo', 'bar'})
+ self.assertEqual(scheduler.status, RQScheduler.Status.STOPPED)
+
+ def test_should_reacquire_locks(self):
+ """scheduler.should_reacquire_locks works properly"""
+ queue = Queue(connection=self.testconn)
+ scheduler = RQScheduler([queue], connection=self.testconn)
+ self.assertTrue(scheduler.should_reacquire_locks)
+ scheduler.acquire_locks()
+ self.assertIsNotNone(scheduler.lock_acquisition_time)
+
+ # scheduler.should_reacquire_locks always returns False if
+ # scheduler.acquired_locks and scheduler._queue_names are the same
+ self.assertFalse(scheduler.should_reacquire_locks)
+ scheduler.lock_acquisition_time = datetime.now() - timedelta(minutes=16)
+ self.assertFalse(scheduler.should_reacquire_locks)
+
+ scheduler._queue_names = set(['default', 'foo'])
+ self.assertTrue(scheduler.should_reacquire_locks)
+ scheduler.acquire_locks()
+ self.assertFalse(scheduler.should_reacquire_locks)
+
+ def test_lock_acquisition(self):
+ """Test lock acquisition"""
+ name_1 = 'lock-test-1'
+ name_2 = 'lock-test-2'
+ name_3 = 'lock-test-3'
+ scheduler = RQScheduler([name_1], self.testconn)
+
+ self.assertEqual(scheduler.acquire_locks(), {name_1})
+ self.assertEqual(scheduler._acquired_locks, {name_1})
+ self.assertEqual(scheduler.acquire_locks(), set([]))
+
+ # Only name_2 is returned since name_1 is already locked
+ scheduler = RQScheduler([name_1, name_2], self.testconn)
+ self.assertEqual(scheduler.acquire_locks(), {name_2})
+ self.assertEqual(scheduler._acquired_locks, {name_2})
+
+ # When a new lock is successfully acquired, _acquired_locks is added
+ scheduler._queue_names.add(name_3)
+ self.assertEqual(scheduler.acquire_locks(), {name_3})
+ self.assertEqual(scheduler._acquired_locks, {name_2, name_3})
+
+ def test_lock_acquisition_with_auto_start(self):
+ """Test lock acquisition with auto_start=True"""
+ scheduler = RQScheduler(['auto-start'], self.testconn)
+ with mock.patch.object(scheduler, 'start') as mocked:
+ scheduler.acquire_locks(auto_start=True)
+ self.assertEqual(mocked.call_count, 1)
+
+ # If process has started, scheduler.start() won't be called
+ scheduler = RQScheduler(['auto-start2'], self.testconn)
+ scheduler._process = 1
+ with mock.patch.object(scheduler, 'start') as mocked:
+ scheduler.acquire_locks(auto_start=True)
+ self.assertEqual(mocked.call_count, 0)
+
+ def test_heartbeat(self):
+ """Test that heartbeat updates locking keys TTL"""
+ name_1 = 'lock-test-1'
+ name_2 = 'lock-test-2'
+ scheduler = RQScheduler([name_1, name_2], self.testconn)
+ scheduler.acquire_locks()
+
+ locking_key_1 = RQScheduler.get_locking_key(name_1)
+ locking_key_2 = RQScheduler.get_locking_key(name_2)
+
+ with self.testconn.pipeline() as pipeline:
+ pipeline.expire(locking_key_1, 1000)
+ pipeline.expire(locking_key_2, 1000)
+
+ scheduler.heartbeat()
+ self.assertEqual(self.testconn.ttl(locking_key_1), 6)
+ self.assertEqual(self.testconn.ttl(locking_key_1), 6)
+
+ # scheduler.stop() releases locks and sets status to STOPPED
+ scheduler._status = scheduler.Status.WORKING
+ scheduler.stop()
+ self.assertFalse(self.testconn.exists(locking_key_1))
+ self.assertFalse(self.testconn.exists(locking_key_2))
+ self.assertEqual(scheduler.status, scheduler.Status.STOPPED)
+
+ # Heartbeat also works properly for schedulers with a single queue
+ scheduler = RQScheduler([name_1], self.testconn)
+ scheduler.acquire_locks()
+ self.testconn.expire(locking_key_1, 1000)
+ scheduler.heartbeat()
+ self.assertEqual(self.testconn.ttl(locking_key_1), 6)
+
+ def test_enqueue_scheduled_jobs(self):
+ """Scheduler can enqueue scheduled jobs"""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ job = Job.create('myfunc', connection=self.testconn)
+ job.save()
+ registry.schedule(job, datetime(2019, 1, 1, tzinfo=utc))
+ scheduler = RQScheduler([queue], connection=self.testconn)
+ scheduler.acquire_locks()
+ scheduler.enqueue_scheduled_jobs()
+ self.assertEqual(len(queue), 1)
+
+ # After job is scheduled, registry should be empty
+ self.assertEqual(len(registry), 0)
+
+ # Jobs scheduled in the far future should not be affected
+ registry.schedule(job, datetime(2100, 1, 1, tzinfo=utc))
+ scheduler.enqueue_scheduled_jobs()
+ self.assertEqual(len(queue), 1)
+
+ def test_prepare_registries(self):
+ """prepare_registries() creates self._scheduled_job_registries"""
+ foo_queue = Queue('foo', connection=self.testconn)
+ bar_queue = Queue('bar', connection=self.testconn)
+ scheduler = RQScheduler([foo_queue, bar_queue], connection=self.testconn)
+ self.assertEqual(scheduler._scheduled_job_registries, [])
+ scheduler.prepare_registries([foo_queue.name])
+ self.assertEqual(scheduler._scheduled_job_registries, [ScheduledJobRegistry(queue=foo_queue)])
+ scheduler.prepare_registries([foo_queue.name, bar_queue.name])
+ self.assertEqual(
+ scheduler._scheduled_job_registries,
+ [ScheduledJobRegistry(queue=foo_queue), ScheduledJobRegistry(queue=bar_queue)]
+ )
+
+
+class TestWorker(RQTestCase):
+
+ def test_work_burst(self):
+ """worker.work() with scheduler enabled works properly"""
+ queue = Queue(connection=self.testconn)
+ worker = Worker(queues=[queue], connection=self.testconn)
+ worker.work(burst=True, with_scheduler=False)
+ self.assertIsNone(worker.scheduler)
+
+ worker = Worker(queues=[queue], connection=self.testconn)
+ worker.work(burst=True, with_scheduler=True)
+ self.assertIsNotNone(worker.scheduler)
+
+ @mock.patch.object(RQScheduler, 'acquire_locks')
+ def test_run_maintenance_tasks(self, mocked):
+ """scheduler.acquire_locks() is called only when scheduled is enabled"""
+ queue = Queue(connection=self.testconn)
+ worker = Worker(queues=[queue], connection=self.testconn)
+
+ worker.run_maintenance_tasks()
+ self.assertEqual(mocked.call_count, 0)
+
+ worker.last_cleaned_at = None
+ worker.scheduler = RQScheduler([queue], connection=self.testconn)
+ worker.run_maintenance_tasks()
+ self.assertEqual(mocked.call_count, 0)
+
+ worker.last_cleaned_at = datetime.now()
+ worker.run_maintenance_tasks()
+ self.assertEqual(mocked.call_count, 1)
+
+ def test_work(self):
+ queue = Queue(connection=self.testconn)
+ worker = Worker(queues=[queue], connection=self.testconn)
+ p = Process(target=kill_worker, args=(os.getpid(), False, 5))
+
+ p.start()
+ queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ worker.work(burst=False, with_scheduler=True)
+ p.join(1)
+ self.assertIsNotNone(worker.scheduler)
+ registry = FinishedJobRegistry(queue=queue)
+ self.assertEqual(len(registry), 1)
+
+
+class TestQueue(RQTestCase):
+
+ def test_enqueue_at(self):
+ """queue.enqueue_at() puts job in the scheduled"""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ scheduler = RQScheduler([queue], connection=self.testconn)
+ scheduler.acquire_locks()
+ # Jobs created using enqueue_at is put in the ScheduledJobRegistry
+ queue.enqueue_at(datetime(2019, 1, 1, tzinfo=utc), say_hello)
+ self.assertEqual(len(queue), 0)
+ self.assertEqual(len(registry), 1)
+
+ # After enqueue_scheduled_jobs() is called, the registry is empty
+ # and job is enqueued
+ scheduler.enqueue_scheduled_jobs()
+ self.assertEqual(len(queue), 1)
+ self.assertEqual(len(registry), 0)
+
+ def test_enqueue_in(self):
+ """queue.enqueue_in() schedules job correctly"""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+
+ job = queue.enqueue_in(timedelta(seconds=30), say_hello)
+ now = datetime.now(utc)
+ scheduled_time = registry.get_scheduled_time(job)
+ # Ensure that job is scheduled roughly 30 seconds from now
+ self.assertTrue(
+ now + timedelta(seconds=28) < scheduled_time < now + timedelta(seconds=32)
+ )
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 25fde13..e983bf5 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -22,9 +22,9 @@ from mock import Mock
from tests import RQTestCase, slow
from tests.fixtures import (
- create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello,
- say_pid, run_dummy_heroku_worker, access_self, modify_self,
- modify_self_and_error, long_running_job, save_key_ttl
+ access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing,
+ kill_worker, long_running_job, modify_self, modify_self_and_error,
+ run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid,
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
@@ -59,6 +59,9 @@ class TestWorker(RQTestCase):
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
+ self.assertEqual(w.queue_keys(), [w.queues[0].key, w.queues[1].key])
+ self.assertEqual(w.queue_names(), ['foo', 'bar'])
+
# With iterable of strings
w = Worker(iter(['foo', 'bar']))
self.assertEqual(w.queues[0].name, 'foo')
@@ -952,16 +955,6 @@ class TestWorker(RQTestCase):
self.assertEqual(worker.python_version, python_version)
-def kill_worker(pid, double_kill):
- # wait for the worker to be started over on the main process
- time.sleep(0.5)
- os.kill(pid, signal.SIGTERM)
- if double_kill:
- # give the worker time to switch signal handler
- time.sleep(0.5)
- os.kill(pid, signal.SIGTERM)
-
-
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
os.kill(pid, signal.SIGKILL)
@@ -1203,7 +1196,7 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
# Mimic how exception info is actually passed forwards
try:
raise Exception(u"💪")
- except:
+ except Exception:
self.exc_info = sys.exc_info()
def test_handle_exception_handles_non_ascii_in_exception_message(self):