summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Tushman <jtushman@pipewave.com>2014-10-09 14:09:52 -0400
committerJonathan Tushman <jtushman@pipewave.com>2014-12-12 16:58:41 -0500
commit82333d2ad5053876004ff57af209ed7ab447098a (patch)
tree5c3bb7dd2d4811a200c0517ee118d772e4252d3b
parent786d3c588722e66b69cd40962cf914f6beb2a5e5 (diff)
downloadrq-82333d2ad5053876004ff57af209ed7ab447098a.tar.gz
triggering shutdown by setting a redis flag
-rw-r--r--.gitignore3
-rw-r--r--requirements.txt4
-rwxr-xr-xrq/cli/cli.py45
-rw-r--r--rq/cli/helpers.py7
-rw-r--r--rq/job.py21
-rw-r--r--rq/queue.py10
-rw-r--r--rq/suspension.py18
-rw-r--r--rq/utils.py10
-rw-r--r--rq/worker.py74
-rw-r--r--tests/test_cli.py52
-rw-r--r--tests/test_queue.py12
-rw-r--r--tests/test_worker.py43
12 files changed, 236 insertions, 63 deletions
diff --git a/.gitignore b/.gitignore
index 25a046e..f95cc7b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,3 @@
.tox
.vagrant
Vagrantfile
-
-# PyCharm
-.idea
diff --git a/requirements.txt b/requirements.txt
index 539b9a4..8da152d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,2 @@
-redis
-click
+redis==2.7.0
+click>=3.0.0
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 18979dc..afb32ea 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -16,6 +16,7 @@ from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
+from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
@@ -24,8 +25,12 @@ from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')
+config_option = click.option('--config', '-c', help='Module containing RQ settings.')
-def connect(url):
+
+def connect(url, config=None):
+ settings = read_config_file(config) if config else {}
+ url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')
@@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@main.command()
@url_option
-@click.option('--config', '-c', help='Module containing RQ settings.')
+@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
+ if is_suspended(conn):
+ click.secho("The worker has been paused, run reset_paused", fg='red')
+ sys.exit(1)
+
try:
+
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
@@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e:
print(e)
sys.exit(1)
+
+
+@main.command()
+@url_option
+@config_option
+@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
+def suspend(url, config, duration):
+ """Suspends all workers, to resume run `rq resume`"""
+ if duration is not None and duration < 1:
+ click.echo("Duration must be an integer greater than 1")
+ sys.exit(1)
+
+ connection = connect(url, config)
+ connection_suspend(connection, duration)
+
+ if duration:
+ msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
+ automatically resume""".format(duration)
+ click.echo(msg)
+ else:
+ click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
+
+
+@main.command()
+@url_option
+@config_option
+def resume(url, config):
+ """Resumes processing of queues, that where suspended with `rq suspend`"""
+ connection = connect(url, config)
+ connection_resume(connection)
+ click.echo("Resuming workers.")
diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py
index f25cc81..9ea4b58 100644
--- a/rq/cli/helpers.py
+++ b/rq/cli/helpers.py
@@ -8,7 +8,9 @@ from functools import partial
import click
from rq import Queue, Worker
+from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
+from rq.suspension import is_suspended
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
@@ -39,8 +41,9 @@ def get_scale(x):
def state_symbol(state):
symbols = {
- 'busy': red('busy'),
- 'idle': green('idle'),
+ WorkerStatus.BUSY: red('busy'),
+ WorkerStatus.IDLE: green('idle'),
+ WorkerStatus.SUSPENDED: yellow('suspended'),
}
try:
return symbols[state]
diff --git a/rq/job.py b/rq/job.py
index e8080f8..7b17492 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -12,7 +12,7 @@ from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
-from .utils import import_attribute, utcformat, utcnow, utcparse
+from .utils import import_attribute, utcformat, utcnow, utcparse, enum
try:
import cPickle as pickle
@@ -25,16 +25,7 @@ dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
-def enum(name, *sequential, **named):
- values = dict(zip(sequential, range(len(sequential))), **named)
-
- # NOTE: Yes, we *really* want to cast using str() here.
- # On Python 2 type() requires a byte string (which is str() on Python 2).
- # On Python 3 it does not matter, so we'll use str(), which acts as
- # a no-op.
- return type(str(name), (), values)
-
-Status = enum('Status',
+JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')
@@ -166,19 +157,19 @@ class Job(object):
@property
def is_finished(self):
- return self.get_status() == Status.FINISHED
+ return self.get_status() == JobStatus.FINISHED
@property
def is_queued(self):
- return self.get_status() == Status.QUEUED
+ return self.get_status() == JobStatus.QUEUED
@property
def is_failed(self):
- return self.get_status() == Status.FAILED
+ return self.get_status() == JobStatus.FAILED
@property
def is_started(self):
- return self.get_status() == Status.STARTED
+ return self.get_status() == JobStatus.STARTED
@property
def dependency(self):
diff --git a/rq/queue.py b/rq/queue.py
index ff5f860..61f5575 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -5,7 +5,7 @@ from __future__ import (absolute_import, division, print_function,
import uuid
from .connections import resolve_connection
-from .job import Job, Status
+from .job import Job, JobStatus
from .utils import import_attribute, utcnow
from .exceptions import (DequeueTimeout, InvalidJobOperationError,
@@ -180,7 +180,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
- result_ttl=result_ttl, status=Status.QUEUED,
+ result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
@@ -195,7 +195,7 @@ class Queue(object):
while True:
try:
pipe.watch(depends_on.key)
- if depends_on.get_status() != Status.FINISHED:
+ if depends_on.get_status() != JobStatus.FINISHED:
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
@@ -390,7 +390,7 @@ class Queue(object):
class FailedQueue(Queue):
def __init__(self, connection=None):
- super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
+ super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
@@ -417,7 +417,7 @@ class FailedQueue(Queue):
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
- job.set_status(Status.QUEUED)
+ job.set_status(JobStatus.QUEUED)
job.exc_info = None
q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)
diff --git a/rq/suspension.py b/rq/suspension.py
new file mode 100644
index 0000000..b734acd
--- /dev/null
+++ b/rq/suspension.py
@@ -0,0 +1,18 @@
+WORKERS_SUSPENDED = 'rq:suspended'
+
+
+def is_suspended(connection):
+ return connection.exists(WORKERS_SUSPENDED)
+
+
+def suspend(connection, ttl=None):
+ """ttl = time to live in seconds. Default is no expiration
+ Note: If you pass in 0 it will invalidate right away
+ """
+ connection.set(WORKERS_SUSPENDED, 1)
+ if ttl is not None:
+ connection.expire(WORKERS_SUSPENDED, ttl)
+
+
+def resume(connection):
+ return connection.delete(WORKERS_SUSPENDED) \ No newline at end of file
diff --git a/rq/utils.py b/rq/utils.py
index 8233ac6..db56020 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
+
+
+def enum(name, *sequential, **named):
+ values = dict(zip(sequential, range(len(sequential))), **named)
+
+ # NOTE: Yes, we *really* want to cast using str() here.
+ # On Python 2 type() requires a byte string (which is str() on Python 2).
+ # On Python 3 it does not matter, so we'll use str(), which acts as
+ # a no-op.
+ return type(str(name), (), values) \ No newline at end of file
diff --git a/rq/worker.py b/rq/worker.py
index bf40a65..72f1aa4 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -12,18 +12,20 @@ import sys
import time
import traceback
import warnings
+from datetime import datetime
from rq.compat import as_text, string_types, text_type
from .connections import get_current_connection
from .exceptions import DequeueTimeout, NoQueueError
-from .job import Job, Status
+from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .timeouts import UnixSignalDeathPenalty
-from .utils import import_attribute, make_colorizer, utcformat, utcnow
+from .utils import import_attribute, make_colorizer, utcformat, utcnow, enum
from .version import VERSION
from .registry import FinishedJobRegistry, StartedJobRegistry
+from .suspension import is_suspended
try:
from procname import setprocname
@@ -52,8 +54,8 @@ def compact(l):
return [x for x in l if x is not None]
_signames = dict((getattr(signal, signame), signame)
- for signame in dir(signal)
- if signame.startswith('SIG') and '_' not in signame)
+ for signame in dir(signal)
+ if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum):
@@ -65,6 +67,12 @@ def signal_name(signum):
return 'SIG_UNKNOWN'
+WorkerStatus = enum('WorkerStatus',
+ STARTED='started', SUSPENDED='suspended', BUSY='busy',
+ IDLE='idle'
+ )
+
+
class Worker(object):
redis_worker_namespace_prefix = 'rq:worker:'
redis_workers_keys = 'rq:workers'
@@ -333,6 +341,30 @@ class Worker(object):
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
+ def check_for_suspension(self, burst):
+ """Check to see if the workers have been suspended by something like `rq suspend`"""
+
+ before_state = None
+ notified = False
+
+ while not self.stopped and is_suspended(self.connection):
+
+ if burst:
+ self.log.info('Suspended in burst mode -- exiting.')
+ self.log.info('Note: There could still be unperformed jobs on the queue')
+ raise StopRequested
+
+ if not notified:
+ self.log.info('Worker suspended, use "rq resume" command to resume')
+ before_state = self.get_state()
+ self.set_state(WorkerStatus.SUSPENDED)
+ notified = True
+ time.sleep(1)
+
+ if before_state:
+ self.set_state(before_state)
+
+
def work(self, burst=False):
"""Starts the work loop.
@@ -348,15 +380,19 @@ class Worker(object):
did_perform_work = False
self.register_birth()
self.log.info('RQ worker started, version %s' % VERSION)
- self.set_state('starting')
+ self.set_state(WorkerStatus.STARTED)
+
try:
while True:
- if self.stopped:
- self.log.info('Stopping on request.')
- break
-
- timeout = None if burst else max(1, self.default_worker_ttl - 60)
try:
+ self.check_for_suspension(burst)
+
+ if self.stopped:
+ self.log.info('Stopping on request.')
+ break
+
+ timeout = None if burst else max(1, self.default_worker_ttl - 60)
+
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
break
@@ -367,20 +403,22 @@ class Worker(object):
self.execute_job(job)
self.heartbeat()
- if job.get_status() == Status.FINISHED:
+ if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)
did_perform_work = True
+
finally:
if not self.is_horse:
self.register_death()
return did_perform_work
+
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
qnames = self.queue_names()
- self.set_state('idle')
+ self.set_state(WorkerStatus.IDLE)
self.procline('Listening on %s' % ','.join(qnames))
self.log.info('')
self.log.info('*** Listening on %s...' %
@@ -395,7 +433,7 @@ class Worker(object):
if result is not None:
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
- blue(job.description), job.id))
+ blue(job.description), job.id))
break
except DequeueTimeout:
@@ -477,12 +515,12 @@ class Worker(object):
timeout = (job.timeout or 180) + 60
with self.connection._pipeline() as pipeline:
- self.set_state('busy', pipeline=pipeline)
+ self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(timeout, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline)
- job.set_status(Status.STARTED, pipeline=pipeline)
+ job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute()
self.procline('Processing %s from %s since %s' % (
@@ -511,7 +549,7 @@ class Worker(object):
result_ttl = job.get_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
- job._status = Status.FINISHED
+ job._status = JobStatus.FINISHED
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin, self.connection)
@@ -523,7 +561,7 @@ class Worker(object):
pipeline.execute()
except Exception:
- job.set_status(Status.FAILED, pipeline=pipeline)
+ job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())
@@ -552,7 +590,7 @@ class Worker(object):
'arguments': job.args,
'kwargs': job.kwargs,
'queue': job.origin,
- })
+ })
for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,))
diff --git a/tests/test_cli.py b/tests/test_cli.py
index a92fb34..3977006 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -26,6 +26,17 @@ class TestCommandLine(TestCase):
class TestRQCli(RQTestCase):
+
+ def assert_normal_execution(self, result):
+ if result.exit_code == 0:
+ return True
+ else:
+ print("Non normal execution")
+ print("Exit Code: {}".format(result.exit_code))
+ print("Output: {}".format(result.output))
+ print("Exception: {}".format(result.exception))
+ self.assertEqual(result.exit_code, 0)
+
"""Test rq_cli script"""
def setUp(self):
super(TestRQCli, self).setUp()
@@ -41,25 +52,58 @@ class TestRQCli(RQTestCase):
"""rq empty -u <url> failed"""
runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
- self.assertEqual(result.exit_code, 0)
+ self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self):
"""rq requeue -u <url> --all"""
runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
- self.assertEqual(result.exit_code, 0)
+ self.assert_normal_execution(result)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
def test_info(self):
"""rq info -u <url>"""
runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url])
- self.assertEqual(result.exit_code, 0)
+ self.assert_normal_execution(result)
self.assertIn('1 queues, 1 jobs total', result.output)
def test_worker(self):
"""rq worker -u <url> -b"""
runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
- self.assertEqual(result.exit_code, 0)
+ self.assert_normal_execution(result)
+
+ def test_suspend_and_resume(self):
+ """rq suspend -u <url>
+ rq resume -u <url>
+ """
+ runner = CliRunner()
+ result = runner.invoke(main, ['suspend', '-u', self.redis_url])
+ self.assert_normal_execution(result)
+
+ result = runner.invoke(main, ['resume', '-u', self.redis_url])
+ self.assert_normal_execution(result)
+
+ def test_suspend_with_ttl(self):
+ """rq suspend -u <url> --duration=2
+ """
+ runner = CliRunner()
+ result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
+ self.assert_normal_execution(result)
+
+ def test_suspend_with_invalid_ttl(self):
+ """rq suspend -u <url> --duration=0
+ """
+ runner = CliRunner()
+ result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
+
+ self.assertEqual(result.exit_code, 1)
+ self.assertIn("Duration must be an integer greater than 1", result.output)
+
+
+
+
+
+
diff --git a/tests/test_queue.py b/tests/test_queue.py
index 8990e35..c905f13 100644
--- a/tests/test_queue.py
+++ b/tests/test_queue.py
@@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
-from rq.job import Job, Status
+from rq.job import Job, JobStatus
from rq.worker import Worker
from tests import RQTestCase
@@ -262,7 +262,7 @@ class TestQueue(RQTestCase):
"""Enqueueing a job sets its status to "queued"."""
q = Queue()
job = q.enqueue(say_hello)
- self.assertEqual(job.get_status(), Status.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_explicit_args(self):
"""enqueue() works for both implicit/explicit args."""
@@ -346,7 +346,7 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
- parent_job.set_status(Status.FINISHED)
+ parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
@@ -363,7 +363,7 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
- parent_job.set_status(Status.FINISHED)
+ parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id])
@@ -379,7 +379,7 @@ class TestQueue(RQTestCase):
self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued
- parent_job.set_status(Status.FINISHED)
+ parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id])
@@ -441,7 +441,7 @@ class TestFailedQueue(RQTestCase):
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), Status.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl."""
diff --git a/tests/test_worker.py b/tests/test_worker.py
index c6d85ff..75f30aa 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -6,8 +6,9 @@ import os
from rq import get_failed_queue, Queue, Worker, SimpleWorker
from rq.compat import as_text
-from rq.job import Job, Status
+from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
+from rq.suspension import suspend, resume
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
@@ -222,14 +223,14 @@ class TestWorker(RQTestCase):
w = Worker([q])
job = q.enqueue(say_hello)
- self.assertEqual(job.get_status(), Status.QUEUED)
+ self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.is_queued, True)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, False)
w.work(burst=True)
job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), Status.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, True)
self.assertEqual(job.is_failed, False)
@@ -238,7 +239,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero, args=(1,))
w.work(burst=True)
job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), Status.FAILED)
+ self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertEqual(job.is_queued, False)
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
@@ -251,13 +252,13 @@ class TestWorker(RQTestCase):
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
- self.assertEqual(job.get_status(), Status.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
- self.assertNotEqual(job.get_status(), Status.FINISHED)
+ self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly"""
@@ -318,3 +319,33 @@ class TestWorker(RQTestCase):
'Expected at least some work done.')
self.assertEquals(job.result, 'Hi there, Adam!')
self.assertEquals(job.description, '你好 世界!')
+
+ def test_pause_worker_execution(self):
+ """Test Pause Worker Execution"""
+
+ SENTINEL_FILE = '/tmp/rq-tests.txt'
+
+ try:
+ # Remove the sentinel if it is leftover from a previous test run
+ os.remove(SENTINEL_FILE)
+ except OSError as e:
+ if e.errno != 2:
+ raise
+
+ q = Queue()
+ job = q.enqueue(create_file, SENTINEL_FILE)
+
+ w = Worker([q])
+
+ suspend(self.testconn)
+
+ w.work(burst=True)
+ assert q.count == 1
+
+ # Should not have created evidence of execution
+ self.assertEquals(os.path.exists(SENTINEL_FILE), False)
+
+ resume(self.testconn)
+ w.work(burst=True)
+ assert q.count == 0
+ self.assertEquals(os.path.exists(SENTINEL_FILE), True)