summaryrefslogtreecommitdiff
path: root/tests/test_cli.py
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-05-01 12:44:32 +0700
committerGitHub <noreply@github.com>2023-05-01 12:44:32 +0700
commit64cb1a27b9d1f2fd52bbbb5c1e4518c024f74685 (patch)
treefc792e7d75de99843bb130cd0acdfeced6d91ec0 /tests/test_cli.py
parent8a9daecaf2c6ff542ec1e4be7d8ec9ae4c8b803c (diff)
downloadrq-64cb1a27b9d1f2fd52bbbb5c1e4518c024f74685.tar.gz
Worker pool (#1874)
* First stab at implementating worker pool * Use process.is_alive() to check whether a process is still live * Handle shutdown signal * Check worker loop done * First working version of `WorkerPool`. * Added test for check_workers() * Added test for pool.start() * Better shutdown process * Comment out test_start() to see if it fixes CI * Make tests pass * Make CI pass * Comment out some tests * Comment out more tests * Re-enable a test * Re-enable another test * Uncomment check_workers test * Added run_worker test * Minor modification to dead worker detection * More test cases * Better process name for workers * Added back pool.stop_workers() when signal is received * Cleaned up cli.py * WIP on worker-pool command * Fix test * Test that worker pool ignores consecutive shutdown signals * Added test for worker-pool CLI command. * Added timeout to CI jobs * Fix worker pool test * Comment out test_scheduler.py * Fixed worker-pool in burst mode * Increase test coverage * Exclude tests directory from coverage.py * Improve test coverage * Renamed `Pool(num_workers=2) to `Pool(size=2)` * Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`" This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b. * Renamed Pool to WorkerPool * Added a new TestCase that doesn't use LocalStack * Added job_class, worker_class and serializer arguments to WorkerPool * Use parse_connection() in WorkerPool.__init__ * Added CLI arguments for worker-pool * Minor WorkerPool and test fixes * Fixed failing CLI test * Document WorkerPool
Diffstat (limited to 'tests/test_cli.py')
-rw-r--r--tests/test_cli.py303
1 files changed, 199 insertions, 104 deletions
diff --git a/tests/test_cli.py b/tests/test_cli.py
index daa118b..79ac12d 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -12,7 +12,7 @@ from redis import Redis
from rq import Queue
from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule
-from rq.job import Job
+from rq.job import Job, JobStatus
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.serializers import JSONSerializer
from rq.timeouts import UnixSignalDeathPenalty
@@ -25,8 +25,25 @@ from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
-class TestRQCli(RQTestCase):
+class CLITestCase(RQTestCase):
+ def setUp(self):
+ super().setUp()
+ db_num = self.testconn.connection_pool.connection_kwargs['db']
+ self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
+ self.connection = Redis.from_url(self.redis_url)
+
+ def assert_normal_execution(self, result):
+ if result.exit_code == 0:
+ return True
+ else:
+ print("Non normal execution")
+ print("Exit Code: {}".format(result.exit_code))
+ print("Output: {}".format(result.output))
+ print("Exception: {}".format(result.exception))
+ self.assertEqual(result.exit_code, 0)
+
+class TestRQCli(CLITestCase):
@pytest.fixture(autouse=True)
def set_tmpdir(self, tmpdir):
self.tmpdir = tmpdir
@@ -42,12 +59,9 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 0)
"""Test rq_cli script"""
+
def setUp(self):
super().setUp()
- db_num = self.testconn.connection_pool.connection_kwargs['db']
- self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
- self.connection = Redis.from_url(self.redis_url)
-
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
@@ -76,18 +90,9 @@ class TestRQCli(RQTestCase):
cli_config.connection.connection_pool.connection_kwargs['host'],
'testhost.example.com',
)
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['port'],
- 6379
- )
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['db'],
- 0
- )
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['password'],
- None
- )
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6379)
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 0)
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], None)
def test_config_file_default_options_override(self):
""""""
@@ -97,18 +102,9 @@ class TestRQCli(RQTestCase):
cli_config.connection.connection_pool.connection_kwargs['host'],
'testhost.example.com',
)
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['port'],
- 6378
- )
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['db'],
- 2
- )
- self.assertEqual(
- cli_config.connection.connection_pool.connection_kwargs['password'],
- '123'
- )
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6378)
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 2)
+ self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], '123')
def test_config_env_vars(self):
os.environ['REDIS_HOST'] = "testhost.example.com"
@@ -123,18 +119,12 @@ class TestRQCli(RQTestCase):
def test_death_penalty_class(self):
cli_config = CliConfig()
- self.assertEqual(
- UnixSignalDeathPenalty,
- cli_config.death_penalty_class
- )
+ self.assertEqual(UnixSignalDeathPenalty, cli_config.death_penalty_class)
cli_config = CliConfig(death_penalty_class='rq.job.Job')
- self.assertEqual(
- Job,
- cli_config.death_penalty_class
- )
+ self.assertEqual(Job, cli_config.death_penalty_class)
- with self.assertRaises(BadParameter):
+ with self.assertRaises(ValueError):
CliConfig(death_penalty_class='rq.abcd')
def test_empty_nothing(self):
@@ -163,10 +153,7 @@ class TestRQCli(RQTestCase):
self.assertIn(job2, registry)
self.assertIn(job3, registry)
- result = runner.invoke(
- main,
- ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id]
- )
+ result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id])
self.assert_normal_execution(result)
# Only the first specified job is requeued
@@ -174,10 +161,7 @@ class TestRQCli(RQTestCase):
self.assertIn(job2, registry)
self.assertIn(job3, registry)
- result = runner.invoke(
- main,
- ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all']
- )
+ result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all'])
self.assert_normal_execution(result)
# With --all flag, all failed jobs are requeued
self.assertNotIn(job2, registry)
@@ -203,8 +187,7 @@ class TestRQCli(RQTestCase):
self.assertIn(job3, registry)
result = runner.invoke(
- main,
- ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id]
+ main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id]
)
self.assert_normal_execution(result)
@@ -215,7 +198,7 @@ class TestRQCli(RQTestCase):
result = runner.invoke(
main,
- ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all']
+ ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all'],
)
self.assert_normal_execution(result)
# With --all flag, all failed jobs are requeued
@@ -257,8 +240,7 @@ class TestRQCli(RQTestCase):
self.assert_normal_execution(result)
self.assertIn('0 workers, 0 queue', result.output)
- result = runner.invoke(main, ['info', '--by-queue',
- '-u', self.redis_url, '--only-workers'])
+ result = runner.invoke(main, ['info', '--by-queue', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('0 workers, 0 queue', result.output)
@@ -288,14 +270,12 @@ class TestRQCli(RQTestCase):
worker_2.register_birth()
worker_2.set_state(WorkerStatus.BUSY)
- result = runner.invoke(main, ['info', 'foo', 'bar',
- '-u', self.redis_url, '--only-workers'])
+ result = runner.invoke(main, ['info', 'foo', 'bar', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
self.assertIn('2 workers, 2 queues', result.output)
- result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue',
- '-u', self.redis_url, '--only-workers'])
+ result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue', '-u', self.redis_url, '--only-workers'])
self.assert_normal_execution(result)
# Ensure both queues' workers are shown
@@ -374,15 +354,13 @@ class TestRQCli(RQTestCase):
# If disable-default-exception-handler is given, job is not moved to FailedJobRegistry
job = q.enqueue(div_by_zero)
- runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
- '--disable-default-exception-handler'])
+ runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--disable-default-exception-handler'])
registry = FailedJobRegistry(queue=q)
self.assertFalse(job in registry)
# Both default and custom exception handler is run
job = q.enqueue(div_by_zero)
- runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
- '--exception-handler', 'tests.fixtures.add_meta'])
+ runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.add_meta'])
registry = FailedJobRegistry(queue=q)
self.assertTrue(job in registry)
job.refresh()
@@ -390,9 +368,18 @@ class TestRQCli(RQTestCase):
# Only custom exception handler is run
job = q.enqueue(div_by_zero)
- runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
- '--exception-handler', 'tests.fixtures.add_meta',
- '--disable-default-exception-handler'])
+ runner.invoke(
+ main,
+ [
+ 'worker',
+ '-u',
+ self.redis_url,
+ '-b',
+ '--exception-handler',
+ 'tests.fixtures.add_meta',
+ '--disable-default-exception-handler',
+ ],
+ )
registry = FailedJobRegistry(queue=q)
self.assertFalse(job in registry)
job.refresh()
@@ -400,8 +387,8 @@ class TestRQCli(RQTestCase):
def test_suspend_and_resume(self):
"""rq suspend -u <url>
- rq worker -u <url> -b
- rq resume -u <url>
+ rq worker -u <url> -b
+ rq resume -u <url>
"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url])
@@ -409,24 +396,19 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(result.exit_code, 1)
- self.assertEqual(
- result.output.strip(),
- 'RQ is currently suspended, to resume job execution run "rq resume"'
- )
+ self.assertEqual(result.output.strip(), 'RQ is currently suspended, to resume job execution run "rq resume"')
result = runner.invoke(main, ['resume', '-u', self.redis_url])
self.assert_normal_execution(result)
def test_suspend_with_ttl(self):
- """rq suspend -u <url> --duration=2
- """
+ """rq suspend -u <url> --duration=2"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1])
self.assert_normal_execution(result)
def test_suspend_with_invalid_ttl(self):
- """rq suspend -u <url> --duration=0
- """
+ """rq suspend -u <url> --duration=0"""
runner = CliRunner()
result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0])
@@ -439,8 +421,7 @@ class TestRQCli(RQTestCase):
q = Queue('default', connection=connection, serializer=JSONSerializer)
runner = CliRunner()
job = q.enqueue(say_hello)
- runner.invoke(main, ['worker', '-u', self.redis_url,
- '--serializer rq.serializer.JSONSerializer'])
+ runner.invoke(main, ['worker', '-u', self.redis_url, '--serializer rq.serializer.JSONSerializer'])
self.assertIn(job.id, q.job_ids)
def test_cli_enqueue(self):
@@ -458,7 +439,7 @@ class TestRQCli(RQTestCase):
self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
- job_id = result.output[len(prefix):-len(suffix)]
+ job_id = result.output[len(prefix) : -len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
@@ -473,7 +454,9 @@ class TestRQCli(RQTestCase):
self.assertTrue(queue.is_empty())
runner = CliRunner()
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello']
+ )
self.assert_normal_execution(result)
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
@@ -482,7 +465,7 @@ class TestRQCli(RQTestCase):
self.assertTrue(result.output.startswith(prefix))
self.assertTrue(result.output.endswith(suffix))
- job_id = result.output[len(prefix):-len(suffix)]
+ job_id = result.output[len(prefix) : -len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)
@@ -497,9 +480,22 @@ class TestRQCli(RQTestCase):
self.assertTrue(queue.is_empty())
runner = CliRunner()
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'hello',
- ':[1, {"key": "value"}]', ':@tests/test.json', '%1, 2', 'json:=[3.0, true]',
- 'nojson=abc', 'file=@tests/test.json'])
+ result = runner.invoke(
+ main,
+ [
+ 'enqueue',
+ '-u',
+ self.redis_url,
+ 'tests.fixtures.echo',
+ 'hello',
+ ':[1, {"key": "value"}]',
+ ':@tests/test.json',
+ '%1, 2',
+ 'json:=[3.0, true]',
+ 'nojson=abc',
+ 'file=@tests/test.json',
+ ],
+ )
self.assert_normal_execution(result)
job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii')
@@ -523,8 +519,9 @@ class TestRQCli(RQTestCase):
self.assertTrue(len(registry) == 0)
runner = CliRunner()
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
- '--schedule-in', '10s'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-in', '10s']
+ )
self.assert_normal_execution(result)
scheduler.acquire_locks()
@@ -559,8 +556,9 @@ class TestRQCli(RQTestCase):
self.assertTrue(len(registry) == 0)
runner = CliRunner()
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
- '--schedule-at', '2021-01-01T00:00:00'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2021-01-01T00:00:00']
+ )
self.assert_normal_execution(result)
scheduler.acquire_locks()
@@ -578,8 +576,9 @@ class TestRQCli(RQTestCase):
self.assertTrue(len(queue) == 0)
self.assertTrue(len(registry) == 0)
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello',
- '--schedule-at', '2100-01-01T00:00:00'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2100-01-01T00:00:00']
+ )
self.assert_normal_execution(result)
self.assertTrue(len(queue) == 0)
@@ -599,12 +598,28 @@ class TestRQCli(RQTestCase):
self.assertTrue(queue.is_empty())
runner = CliRunner()
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--retry-max', '3',
- '--retry-interval', '10', '--retry-interval', '20', '--retry-interval', '40'])
+ result = runner.invoke(
+ main,
+ [
+ 'enqueue',
+ '-u',
+ self.redis_url,
+ 'tests.fixtures.say_hello',
+ '--retry-max',
+ '3',
+ '--retry-interval',
+ '10',
+ '--retry-interval',
+ '20',
+ '--retry-interval',
+ '40',
+ ],
+ )
self.assert_normal_execution(result)
- job = Job.fetch(self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'),
- connection=self.connection)
+ job = Job.fetch(
+ self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), connection=self.connection
+ )
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [10, 20, 40])
@@ -627,8 +642,9 @@ class TestRQCli(RQTestCase):
self.assertNotEqual(result.exit_code, 0)
self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output)
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo',
- '%invalid_eval_statement'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '%invalid_eval_statement']
+ )
self.assertNotEqual(result.exit_code, 0)
self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output)
@@ -636,8 +652,19 @@ class TestRQCli(RQTestCase):
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify multiple values for the same keyword.', result.output)
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '--schedule-in', '1s',
- '--schedule-at', '2000-01-01T00:00:00'])
+ result = runner.invoke(
+ main,
+ [
+ 'enqueue',
+ '-u',
+ self.redis_url,
+ 'tests.fixtures.echo',
+ '--schedule-in',
+ '1s',
+ '--schedule-at',
+ '2000-01-01T00:00:00',
+ ],
+ )
self.assertNotEqual(result.exit_code, 0)
self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output)
@@ -678,19 +705,25 @@ class TestRQCli(RQTestCase):
self.assertEqual((job.args, job.kwargs), (['abc'], {}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}}))
@@ -714,37 +747,99 @@ class TestRQCli(RQTestCase):
self.assertEqual((job.args, job.kwargs), ([True], {}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {}))
id = str(uuid4())
- result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json'])
+ result = runner.invoke(
+ main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json']
+ )
self.assert_normal_execution(result)
job = Job.fetch(id)
self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())}))
+
+
+class WorkerPoolCLITestCase(CLITestCase):
+ def test_worker_pool_burst_and_num_workers(self):
+ """rq worker-pool -u <url> -b -n 3"""
+ runner = CliRunner()
+ result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '-n', '3'])
+ self.assert_normal_execution(result)
+
+ def test_serializer_and_queue_argument(self):
+ """rq worker-pool foo bar -u <url> -b"""
+ queue = Queue('foo', connection=self.connection, serializer=JSONSerializer)
+ job = queue.enqueue(say_hello, 'Hello')
+ queue = Queue('bar', connection=self.connection, serializer=JSONSerializer)
+ job_2 = queue.enqueue(say_hello, 'Hello')
+ runner = CliRunner()
+ result = runner.invoke(
+ main,
+ ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'],
+ )
+ self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED)
+ self.assertEqual(job_2.get_status(refresh=True), JobStatus.FINISHED)
+
+ def test_worker_class_argument(self):
+ """rq worker-pool -u <url> -b --worker-class rq.Worker"""
+ runner = CliRunner()
+ result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.Worker'])
+ self.assert_normal_execution(result)
+ result = runner.invoke(
+ main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.SimpleWorker']
+ )
+ self.assert_normal_execution(result)
+
+ # This one fails because the worker class doesn't exist
+ result = runner.invoke(
+ main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.NonExistantWorker']
+ )
+ self.assertNotEqual(result.exit_code, 0)
+
+ def test_job_class_argument(self):
+ """rq worker-pool -u <url> -b --job-class rq.job.Job"""
+ runner = CliRunner()
+ result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.Job'])
+ self.assert_normal_execution(result)
+
+ # This one fails because Job class doesn't exist
+ result = runner.invoke(
+ main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.NonExistantJob']
+ )
+ self.assertNotEqual(result.exit_code, 0)