diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2023-05-01 12:44:32 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-01 12:44:32 +0700 |
commit | 64cb1a27b9d1f2fd52bbbb5c1e4518c024f74685 (patch) | |
tree | fc792e7d75de99843bb130cd0acdfeced6d91ec0 /tests/test_cli.py | |
parent | 8a9daecaf2c6ff542ec1e4be7d8ec9ae4c8b803c (diff) | |
download | rq-64cb1a27b9d1f2fd52bbbb5c1e4518c024f74685.tar.gz |
Worker pool (#1874)
* First stab at implementating worker pool
* Use process.is_alive() to check whether a process is still live
* Handle shutdown signal
* Check worker loop done
* First working version of `WorkerPool`.
* Added test for check_workers()
* Added test for pool.start()
* Better shutdown process
* Comment out test_start() to see if it fixes CI
* Make tests pass
* Make CI pass
* Comment out some tests
* Comment out more tests
* Re-enable a test
* Re-enable another test
* Uncomment check_workers test
* Added run_worker test
* Minor modification to dead worker detection
* More test cases
* Better process name for workers
* Added back pool.stop_workers() when signal is received
* Cleaned up cli.py
* WIP on worker-pool command
* Fix test
* Test that worker pool ignores consecutive shutdown signals
* Added test for worker-pool CLI command.
* Added timeout to CI jobs
* Fix worker pool test
* Comment out test_scheduler.py
* Fixed worker-pool in burst mode
* Increase test coverage
* Exclude tests directory from coverage.py
* Improve test coverage
* Renamed `Pool(num_workers=2) to `Pool(size=2)`
* Revert "Renamed `Pool(num_workers=2) to `Pool(size=2)`"
This reverts commit a1306f89ad0d8686c6bde447bff75e2f71f0733b.
* Renamed Pool to WorkerPool
* Added a new TestCase that doesn't use LocalStack
* Added job_class, worker_class and serializer arguments to WorkerPool
* Use parse_connection() in WorkerPool.__init__
* Added CLI arguments for worker-pool
* Minor WorkerPool and test fixes
* Fixed failing CLI test
* Document WorkerPool
Diffstat (limited to 'tests/test_cli.py')
-rw-r--r-- | tests/test_cli.py | 303 |
1 files changed, 199 insertions, 104 deletions
diff --git a/tests/test_cli.py b/tests/test_cli.py index daa118b..79ac12d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,7 +12,7 @@ from redis import Redis from rq import Queue from rq.cli import main from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule -from rq.job import Job +from rq.job import Job, JobStatus from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.serializers import JSONSerializer from rq.timeouts import UnixSignalDeathPenalty @@ -25,8 +25,25 @@ from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello -class TestRQCli(RQTestCase): +class CLITestCase(RQTestCase): + def setUp(self): + super().setUp() + db_num = self.testconn.connection_pool.connection_kwargs['db'] + self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + self.connection = Redis.from_url(self.redis_url) + + def assert_normal_execution(self, result): + if result.exit_code == 0: + return True + else: + print("Non normal execution") + print("Exit Code: {}".format(result.exit_code)) + print("Output: {}".format(result.output)) + print("Exception: {}".format(result.exception)) + self.assertEqual(result.exit_code, 0) + +class TestRQCli(CLITestCase): @pytest.fixture(autouse=True) def set_tmpdir(self, tmpdir): self.tmpdir = tmpdir @@ -42,12 +59,9 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 0) """Test rq_cli script""" + def setUp(self): super().setUp() - db_num = self.testconn.connection_pool.connection_kwargs['db'] - self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num - self.connection = Redis.from_url(self.redis_url) - job = Job.create(func=div_by_zero, args=(1, 2, 3)) job.origin = 'fake' job.save() @@ -76,18 +90,9 @@ class TestRQCli(RQTestCase): cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['port'], - 6379 - ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['db'], - 0 - ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['password'], - None - ) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6379) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 0) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], None) def test_config_file_default_options_override(self): """""" @@ -97,18 +102,9 @@ class TestRQCli(RQTestCase): cli_config.connection.connection_pool.connection_kwargs['host'], 'testhost.example.com', ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['port'], - 6378 - ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['db'], - 2 - ) - self.assertEqual( - cli_config.connection.connection_pool.connection_kwargs['password'], - '123' - ) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['port'], 6378) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['db'], 2) + self.assertEqual(cli_config.connection.connection_pool.connection_kwargs['password'], '123') def test_config_env_vars(self): os.environ['REDIS_HOST'] = "testhost.example.com" @@ -123,18 +119,12 @@ class TestRQCli(RQTestCase): def test_death_penalty_class(self): cli_config = CliConfig() - self.assertEqual( - UnixSignalDeathPenalty, - cli_config.death_penalty_class - ) + self.assertEqual(UnixSignalDeathPenalty, cli_config.death_penalty_class) cli_config = CliConfig(death_penalty_class='rq.job.Job') - self.assertEqual( - Job, - cli_config.death_penalty_class - ) + self.assertEqual(Job, cli_config.death_penalty_class) - with self.assertRaises(BadParameter): + with self.assertRaises(ValueError): CliConfig(death_penalty_class='rq.abcd') def test_empty_nothing(self): @@ -163,10 +153,7 @@ class TestRQCli(RQTestCase): self.assertIn(job2, registry) self.assertIn(job3, registry) - result = runner.invoke( - main, - ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id] - ) + result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', job.id]) self.assert_normal_execution(result) # Only the first specified job is requeued @@ -174,10 +161,7 @@ class TestRQCli(RQTestCase): self.assertIn(job2, registry) self.assertIn(job3, registry) - result = runner.invoke( - main, - ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all'] - ) + result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '--all']) self.assert_normal_execution(result) # With --all flag, all failed jobs are requeued self.assertNotIn(job2, registry) @@ -203,8 +187,7 @@ class TestRQCli(RQTestCase): self.assertIn(job3, registry) result = runner.invoke( - main, - ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id] + main, ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', job.id] ) self.assert_normal_execution(result) @@ -215,7 +198,7 @@ class TestRQCli(RQTestCase): result = runner.invoke( main, - ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all'] + ['requeue', '-u', self.redis_url, '--queue', 'requeue', '-S', 'rq.serializers.JSONSerializer', '--all'], ) self.assert_normal_execution(result) # With --all flag, all failed jobs are requeued @@ -257,8 +240,7 @@ class TestRQCli(RQTestCase): self.assert_normal_execution(result) self.assertIn('0 workers, 0 queue', result.output) - result = runner.invoke(main, ['info', '--by-queue', - '-u', self.redis_url, '--only-workers']) + result = runner.invoke(main, ['info', '--by-queue', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('0 workers, 0 queue', result.output) @@ -288,14 +270,12 @@ class TestRQCli(RQTestCase): worker_2.register_birth() worker_2.set_state(WorkerStatus.BUSY) - result = runner.invoke(main, ['info', 'foo', 'bar', - '-u', self.redis_url, '--only-workers']) + result = runner.invoke(main, ['info', 'foo', 'bar', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) self.assertIn('2 workers, 2 queues', result.output) - result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue', - '-u', self.redis_url, '--only-workers']) + result = runner.invoke(main, ['info', 'foo', 'bar', '--by-queue', '-u', self.redis_url, '--only-workers']) self.assert_normal_execution(result) # Ensure both queues' workers are shown @@ -374,15 +354,13 @@ class TestRQCli(RQTestCase): # If disable-default-exception-handler is given, job is not moved to FailedJobRegistry job = q.enqueue(div_by_zero) - runner.invoke(main, ['worker', '-u', self.redis_url, '-b', - '--disable-default-exception-handler']) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--disable-default-exception-handler']) registry = FailedJobRegistry(queue=q) self.assertFalse(job in registry) # Both default and custom exception handler is run job = q.enqueue(div_by_zero) - runner.invoke(main, ['worker', '-u', self.redis_url, '-b', - '--exception-handler', 'tests.fixtures.add_meta']) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.add_meta']) registry = FailedJobRegistry(queue=q) self.assertTrue(job in registry) job.refresh() @@ -390,9 +368,18 @@ class TestRQCli(RQTestCase): # Only custom exception handler is run job = q.enqueue(div_by_zero) - runner.invoke(main, ['worker', '-u', self.redis_url, '-b', - '--exception-handler', 'tests.fixtures.add_meta', - '--disable-default-exception-handler']) + runner.invoke( + main, + [ + 'worker', + '-u', + self.redis_url, + '-b', + '--exception-handler', + 'tests.fixtures.add_meta', + '--disable-default-exception-handler', + ], + ) registry = FailedJobRegistry(queue=q) self.assertFalse(job in registry) job.refresh() @@ -400,8 +387,8 @@ class TestRQCli(RQTestCase): def test_suspend_and_resume(self): """rq suspend -u <url> - rq worker -u <url> -b - rq resume -u <url> + rq worker -u <url> -b + rq resume -u <url> """ runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url]) @@ -409,24 +396,19 @@ class TestRQCli(RQTestCase): result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assertEqual(result.exit_code, 1) - self.assertEqual( - result.output.strip(), - 'RQ is currently suspended, to resume job execution run "rq resume"' - ) + self.assertEqual(result.output.strip(), 'RQ is currently suspended, to resume job execution run "rq resume"') result = runner.invoke(main, ['resume', '-u', self.redis_url]) self.assert_normal_execution(result) def test_suspend_with_ttl(self): - """rq suspend -u <url> --duration=2 - """ + """rq suspend -u <url> --duration=2""" runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 1]) self.assert_normal_execution(result) def test_suspend_with_invalid_ttl(self): - """rq suspend -u <url> --duration=0 - """ + """rq suspend -u <url> --duration=0""" runner = CliRunner() result = runner.invoke(main, ['suspend', '-u', self.redis_url, '--duration', 0]) @@ -439,8 +421,7 @@ class TestRQCli(RQTestCase): q = Queue('default', connection=connection, serializer=JSONSerializer) runner = CliRunner() job = q.enqueue(say_hello) - runner.invoke(main, ['worker', '-u', self.redis_url, - '--serializer rq.serializer.JSONSerializer']) + runner.invoke(main, ['worker', '-u', self.redis_url, '--serializer rq.serializer.JSONSerializer']) self.assertIn(job.id, q.job_ids) def test_cli_enqueue(self): @@ -458,7 +439,7 @@ class TestRQCli(RQTestCase): self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) - job_id = result.output[len(prefix):-len(suffix)] + job_id = result.output[len(prefix) : -len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) @@ -473,7 +454,9 @@ class TestRQCli(RQTestCase): self.assertTrue(queue.is_empty()) runner = CliRunner() - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '-S', 'rq.serializers.JSONSerializer', 'tests.fixtures.say_hello'] + ) self.assert_normal_execution(result) prefix = 'Enqueued tests.fixtures.say_hello() with job-id \'' @@ -482,7 +465,7 @@ class TestRQCli(RQTestCase): self.assertTrue(result.output.startswith(prefix)) self.assertTrue(result.output.endswith(suffix)) - job_id = result.output[len(prefix):-len(suffix)] + job_id = result.output[len(prefix) : -len(suffix)] queue_key = 'rq:queue:default' self.assertEqual(self.connection.llen(queue_key), 1) self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id) @@ -497,9 +480,22 @@ class TestRQCli(RQTestCase): self.assertTrue(queue.is_empty()) runner = CliRunner() - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', 'hello', - ':[1, {"key": "value"}]', ':@tests/test.json', '%1, 2', 'json:=[3.0, true]', - 'nojson=abc', 'file=@tests/test.json']) + result = runner.invoke( + main, + [ + 'enqueue', + '-u', + self.redis_url, + 'tests.fixtures.echo', + 'hello', + ':[1, {"key": "value"}]', + ':@tests/test.json', + '%1, 2', + 'json:=[3.0, true]', + 'nojson=abc', + 'file=@tests/test.json', + ], + ) self.assert_normal_execution(result) job_id = self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii') @@ -523,8 +519,9 @@ class TestRQCli(RQTestCase): self.assertTrue(len(registry) == 0) runner = CliRunner() - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', - '--schedule-in', '10s']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-in', '10s'] + ) self.assert_normal_execution(result) scheduler.acquire_locks() @@ -559,8 +556,9 @@ class TestRQCli(RQTestCase): self.assertTrue(len(registry) == 0) runner = CliRunner() - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', - '--schedule-at', '2021-01-01T00:00:00']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2021-01-01T00:00:00'] + ) self.assert_normal_execution(result) scheduler.acquire_locks() @@ -578,8 +576,9 @@ class TestRQCli(RQTestCase): self.assertTrue(len(queue) == 0) self.assertTrue(len(registry) == 0) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', - '--schedule-at', '2100-01-01T00:00:00']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--schedule-at', '2100-01-01T00:00:00'] + ) self.assert_normal_execution(result) self.assertTrue(len(queue) == 0) @@ -599,12 +598,28 @@ class TestRQCli(RQTestCase): self.assertTrue(queue.is_empty()) runner = CliRunner() - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.say_hello', '--retry-max', '3', - '--retry-interval', '10', '--retry-interval', '20', '--retry-interval', '40']) + result = runner.invoke( + main, + [ + 'enqueue', + '-u', + self.redis_url, + 'tests.fixtures.say_hello', + '--retry-max', + '3', + '--retry-interval', + '10', + '--retry-interval', + '20', + '--retry-interval', + '40', + ], + ) self.assert_normal_execution(result) - job = Job.fetch(self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), - connection=self.connection) + job = Job.fetch( + self.connection.lrange('rq:queue:default', 0, -1)[0].decode('ascii'), connection=self.connection + ) self.assertEqual(job.retries_left, 3) self.assertEqual(job.retry_intervals, [10, 20, 40]) @@ -627,8 +642,9 @@ class TestRQCli(RQTestCase): self.assertNotEqual(result.exit_code, 0) self.assertIn('Unable to parse 1. non keyword argument as JSON.', result.output) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', - '%invalid_eval_statement']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '%invalid_eval_statement'] + ) self.assertNotEqual(result.exit_code, 0) self.assertIn('Unable to eval 1. non keyword argument as Python object.', result.output) @@ -636,8 +652,19 @@ class TestRQCli(RQTestCase): self.assertNotEqual(result.exit_code, 0) self.assertIn('You can\'t specify multiple values for the same keyword.', result.output) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, 'tests.fixtures.echo', '--schedule-in', '1s', - '--schedule-at', '2000-01-01T00:00:00']) + result = runner.invoke( + main, + [ + 'enqueue', + '-u', + self.redis_url, + 'tests.fixtures.echo', + '--schedule-in', + '1s', + '--schedule-at', + '2000-01-01T00:00:00', + ], + ) self.assertNotEqual(result.exit_code, 0) self.assertIn('You can\'t specify both --schedule-in and --schedule-at', result.output) @@ -678,19 +705,25 @@ class TestRQCli(RQTestCase): self.assertEqual((job.args, job.kwargs), (['abc'], {})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'abc=def'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'abc': 'def'})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':{"json": "abc"}'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([{'json': 'abc'}], {})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:={"json": "abc"}'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'key': {'json': 'abc'}})) @@ -714,37 +747,99 @@ class TestRQCli(RQTestCase): self.assertEqual((job.args, job.kwargs), ([True], {})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%=(1, 2)'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'key': (1, 2)})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key%={"foo": True}'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'key': {"foo": True}})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', '@tests/test.json'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([open('tests/test.json', 'r').read()], {})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key=@tests/test.json'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'key': open('tests/test.json', 'r').read()})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', ':@tests/test.json'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([json.loads(open('tests/test.json', 'r').read())], {})) id = str(uuid4()) - result = runner.invoke(main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json']) + result = runner.invoke( + main, ['enqueue', '-u', self.redis_url, '--job-id', id, 'tests.fixtures.echo', 'key:=@tests/test.json'] + ) self.assert_normal_execution(result) job = Job.fetch(id) self.assertEqual((job.args, job.kwargs), ([], {'key': json.loads(open('tests/test.json', 'r').read())})) + + +class WorkerPoolCLITestCase(CLITestCase): + def test_worker_pool_burst_and_num_workers(self): + """rq worker-pool -u <url> -b -n 3""" + runner = CliRunner() + result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '-n', '3']) + self.assert_normal_execution(result) + + def test_serializer_and_queue_argument(self): + """rq worker-pool foo bar -u <url> -b""" + queue = Queue('foo', connection=self.connection, serializer=JSONSerializer) + job = queue.enqueue(say_hello, 'Hello') + queue = Queue('bar', connection=self.connection, serializer=JSONSerializer) + job_2 = queue.enqueue(say_hello, 'Hello') + runner = CliRunner() + result = runner.invoke( + main, + ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'], + ) + self.assertEqual(job.get_status(refresh=True), JobStatus.FINISHED) + self.assertEqual(job_2.get_status(refresh=True), JobStatus.FINISHED) + + def test_worker_class_argument(self): + """rq worker-pool -u <url> -b --worker-class rq.Worker""" + runner = CliRunner() + result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.Worker']) + self.assert_normal_execution(result) + result = runner.invoke( + main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.SimpleWorker'] + ) + self.assert_normal_execution(result) + + # This one fails because the worker class doesn't exist + result = runner.invoke( + main, ['worker-pool', '-u', self.redis_url, '-b', '--worker-class', 'rq.worker.NonExistantWorker'] + ) + self.assertNotEqual(result.exit_code, 0) + + def test_job_class_argument(self): + """rq worker-pool -u <url> -b --job-class rq.job.Job""" + runner = CliRunner() + result = runner.invoke(main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.Job']) + self.assert_normal_execution(result) + + # This one fails because Job class doesn't exist + result = runner.invoke( + main, ['worker-pool', '-u', self.redis_url, '-b', '--job-class', 'rq.job.NonExistantJob'] + ) + self.assertNotEqual(result.exit_code, 0) |