diff options
author | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
---|---|---|
committer | Selwin Ong <selwin.ong@gmail.com> | 2023-03-23 11:23:50 +0700 |
commit | a4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch) | |
tree | ef4c0e7c1c62f485f00f7b85277beecaf515c120 /tests | |
parent | 63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff) | |
parent | 04722339d7598ff0c52f11c3680ed2dd922e6768 (diff) | |
download | rq-watcher.tar.gz |
Merge branch 'master' of github.com:rq/rq into watcherwatcher
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_cli.py | 34 | ||||
-rw-r--r-- | tests/test_connection.py | 33 | ||||
-rw-r--r-- | tests/test_dependencies.py | 31 | ||||
-rw-r--r-- | tests/test_helpers.py | 48 | ||||
-rw-r--r-- | tests/test_job.py | 33 | ||||
-rw-r--r-- | tests/test_registry.py | 9 | ||||
-rw-r--r-- | tests/test_results.py | 22 | ||||
-rw-r--r-- | tests/test_timeouts.py | 7 | ||||
-rw-r--r-- | tests/test_worker.py | 95 |
9 files changed, 270 insertions, 42 deletions
diff --git a/tests/test_cli.py b/tests/test_cli.py index 07b9c39..daa118b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,7 @@ from uuid import uuid4 import os import json +from click import BadParameter from click.testing import CliRunner from redis import Redis @@ -14,6 +15,7 @@ from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, pars from rq.job import Job from rq.registry import FailedJobRegistry, ScheduledJobRegistry from rq.serializers import JSONSerializer +from rq.timeouts import UnixSignalDeathPenalty from rq.worker import Worker, WorkerStatus from rq.scheduler import RQScheduler @@ -118,6 +120,23 @@ class TestRQCli(RQTestCase): 'testhost.example.com', ) + def test_death_penalty_class(self): + cli_config = CliConfig() + + self.assertEqual( + UnixSignalDeathPenalty, + cli_config.death_penalty_class + ) + + cli_config = CliConfig(death_penalty_class='rq.job.Job') + self.assertEqual( + Job, + cli_config.death_penalty_class + ) + + with self.assertRaises(BadParameter): + CliConfig(death_penalty_class='rq.abcd') + def test_empty_nothing(self): """rq empty -u <url>""" runner = CliRunner() @@ -326,6 +345,21 @@ class TestRQCli(RQTestCase): result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) + def test_worker_dequeue_strategy(self): + """--quiet and --verbose logging options are supported""" + runner = CliRunner() + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong'] + result = runner.invoke(main, args) + self.assertEqual(result.exit_code, 1) + def test_exception_handlers(self): """rq worker -u <url> -b --exception-handler <handler>""" connection = Redis.from_url(self.redis_url) diff --git a/tests/test_connection.py b/tests/test_connection.py index fdfafbd..393c20d 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,9 +1,7 @@ from redis import Redis -from rq import Connection, Queue, use_connection, get_current_connection, pop_connection -from rq.connections import NoRedisConnectionException - -from tests import find_empty_redis_database, RQTestCase +from rq import Connection, Queue +from tests import RQTestCase, find_empty_redis_database from tests.fixtures import do_nothing @@ -37,30 +35,3 @@ class TestConnectionInheritance(RQTestCase): job2 = q2.enqueue(do_nothing) self.assertEqual(q1.connection, job1.connection) self.assertEqual(q2.connection, job2.connection) - - -class TestConnectionHelpers(RQTestCase): - def test_use_connection(self): - """Test function use_connection works as expected.""" - conn = new_connection() - use_connection(conn) - - self.assertEqual(conn, get_current_connection()) - - use_connection() - - self.assertNotEqual(conn, get_current_connection()) - - use_connection(self.testconn) # Restore RQTestCase connection - - with self.assertRaises(AssertionError): - with Connection(new_connection()): - use_connection() - with Connection(new_connection()): - use_connection() - - def test_resolve_connection_raises_on_no_connection(self): - """Test function resolve_connection raises if there is no connection.""" - pop_connection() - with self.assertRaises(NoRedisConnectionException): - Queue() diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 26b115d..a290a87 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -117,6 +117,37 @@ class TestDependencies(RQTestCase): self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) + def test_dependency_list_in_depends_on(self): + """Enqueue with Dependency list in depends_on""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job1 = q.enqueue(say_hello) + parent_job2 = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])]) + w.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + + def test_enqueue_job_dependency(self): + """Enqueue via Queue.enqueue_job() with depencency""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job = Job.create(say_hello) + parent_job.save() + job = Job.create(say_hello, depends_on=parent_job) + q.enqueue_job(job) + w.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + q.enqueue_job(parent_job) + w.work(burst=True) + self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index b43f13b..5a84f71 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,11 +1,12 @@ from rq.cli.helpers import get_redis_from_config from tests import RQTestCase - +from unittest import mock class TestHelpers(RQTestCase): - def test_get_redis_from_config(self): + @mock.patch('rq.cli.helpers.Sentinel') + def test_get_redis_from_config(self, sentinel_class_mock): """Ensure Redis connection params are properly parsed""" settings = { 'REDIS_URL': 'redis://localhost:1/1' @@ -39,3 +40,46 @@ class TestHelpers(RQTestCase): self.assertEqual(connection_kwargs['db'], 2) self.assertEqual(connection_kwargs['port'], 2) self.assertEqual(connection_kwargs['password'], 'bar') + + # Add Sentinel to the settings + settings.update({ + 'SENTINEL': { + 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], + 'MASTER_NAME': 'master', + 'DB': 2, + 'USERNAME': 'redis-user', + 'PASSWORD': 'redis-secret', + 'SOCKET_TIMEOUT': None, + 'CONNECTION_KWARGS': { + 'ssl_ca_path': None, + }, + 'SENTINEL_KWARGS': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + }, + }, + }) + + # Ensure SENTINEL is preferred against REDIS_* parameters + redis = get_redis_from_config(settings) + sentinel_init_sentinels_args = sentinel_class_mock.call_args[0] + sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1] + self.assertEqual( + sentinel_init_sentinels_args, + ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],) + ) + self.assertDictEqual( + sentinel_init_sentinel_kwargs, + { + 'db': 2, + 'ssl': False, + 'username': 'redis-user', + 'password': 'redis-secret', + 'socket_timeout': None, + 'ssl_ca_path': None, + 'sentinel_kwargs': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + } + } + ) diff --git a/tests/test_job.py b/tests/test_job.py index 9d1ceae..23bbd11 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,4 +1,6 @@ import json + +from rq.defaults import CALLBACK_TIMEOUT from rq.serializers import JSONSerializer import time import queue @@ -9,7 +11,7 @@ from redis import WatchError from rq.utils import as_text from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError -from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job +from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback from rq.queue import Queue from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, @@ -209,9 +211,9 @@ class TestJob(RQTestCase): # ... and no other keys are stored self.assertEqual( - set(self.testconn.hkeys(job.key)), {b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', - b'worker_name', b'success_callback_name', b'failure_callback_name'} + b'worker_name', b'success_callback_name', b'failure_callback_name'}, + set(self.testconn.hkeys(job.key)) ) self.assertEqual(job.last_heartbeat, None) @@ -241,6 +243,31 @@ class TestJob(RQTestCase): self.assertEqual(stored_job.dependency.id, parent_job.id) self.assertEqual(stored_job.dependency, parent_job) + def test_persistence_of_callbacks(self): + """Storing jobs with success and/or failure callbacks.""" + job = Job.create(func=fixtures.some_calculation, + on_success=Callback(fixtures.say_hello, timeout=10), + on_failure=fixtures.say_pid) # deprecated callable + job.save() + stored_job = Job.fetch(job.id) + + self.assertEqual(fixtures.say_hello, stored_job.success_callback) + self.assertEqual(10, stored_job.success_callback_timeout) + self.assertEqual(fixtures.say_pid, stored_job.failure_callback) + self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout) + + # None(s) + job = Job.create(func=fixtures.some_calculation, + on_failure=None) + job.save() + stored_job = Job.fetch(job.id) + self.assertIsNone(stored_job.success_callback) + self.assertEqual(CALLBACK_TIMEOUT, job.success_callback_timeout) # timeout should be never none + self.assertEqual(CALLBACK_TIMEOUT, stored_job.success_callback_timeout) + self.assertIsNone(stored_job.failure_callback) + self.assertEqual(CALLBACK_TIMEOUT, job.failure_callback_timeout) # timeout should be never none + self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout) + def test_store_then_fetch(self): """Store, then fetch.""" job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), diff --git a/tests/test_registry.py b/tests/test_registry.py index 28a29ca..57584b5 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,9 +1,12 @@ from datetime import datetime, timedelta +from unittest import mock +from unittest.mock import PropertyMock, ANY + from rq.serializers import JSONSerializer from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL -from rq.exceptions import InvalidJobOperation +from rq.exceptions import InvalidJobOperation, AbandonedJobError from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue from rq.utils import current_timestamp @@ -161,7 +164,9 @@ class TestRegistry(RQTestCase): self.assertNotIn(job, failed_job_registry) self.assertIn(job, self.registry) - self.registry.cleanup() + with mock.patch.object(Job, 'execute_failure_callback') as mocked: + self.registry.cleanup() + mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY) self.assertIn(job.id, failed_job_registry) self.assertNotIn(job, self.registry) job.refresh() diff --git a/tests/test_results.py b/tests/test_results.py index 4f705f5..9bc1b9e 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -214,3 +214,25 @@ class TestScheduledJobRegistry(RQTestCase): job = queue.enqueue(div_by_zero) self.assertEqual(job.latest_result().type, Result.Type.FAILED) + + def test_job_return_value_result_ttl_infinity(self): + """Test job.return_value when queue.result_ttl=-1""" + queue = Queue(connection=self.connection, result_ttl=-1) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNone(job.return_value()) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=-1, return_value=1) + self.assertEqual(job.return_value(), 1) + + def test_job_return_value_result_ttl_zero(self): + """Test job.return_value when queue.result_ttl=0""" + queue = Queue(connection=self.connection, result_ttl=0) + job = queue.enqueue(say_hello) + + # Returns None when there's no result + self.assertIsNone(job.return_value()) + + Result.create(job, Result.Type.SUCCESSFUL, ttl=0, return_value=1) + self.assertIsNone(job.return_value()) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 2872ee0..1f392a3 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -42,3 +42,10 @@ class TestTimeouts(RQTestCase): self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
+
+ # Test negative timeout doesn't raise JobTimeoutException,
+ # which implies an unintended immediate timeout.
+ job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=-1)
+ w.work(burst=True)
+ job.refresh()
+ self.assertIn(job, finished_job_registry)
diff --git a/tests/test_worker.py b/tests/test_worker.py index cfce473..dfa0f1d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -19,6 +19,7 @@ import pytest from unittest import mock from unittest.mock import Mock +from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from tests import RQTestCase, slow from tests.fixtures import ( access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing, @@ -607,6 +608,31 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + @slow + def test_max_idle_time(self): + q = Queue() + w = Worker([q]) + q.enqueue(say_hello, args=('Frank',)) + self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1)) + + # idle for 1 second + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1)) + + # idle for 3 seconds + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + + # idle for 2 seconds because idle_time is less than timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2)) + self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer + + # idle for 3 seconds because idle_time is less than two rounds of timeout + now = utcnow() + self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3)) + self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer + @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -639,7 +665,6 @@ class TestWorker(RQTestCase): q = Queue() w = Worker([q]) - # Put it on the queue with a timeout value self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self): @@ -936,7 +961,15 @@ class TestWorker(RQTestCase): worker.last_cleaned_at = utcnow() self.assertFalse(worker.should_run_maintenance_tasks) - worker.last_cleaned_at = utcnow() - timedelta(seconds=3700) + worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100) + self.assertTrue(worker.should_run_maintenance_tasks) + + # custom maintenance_interval + worker = Worker(queue, maintenance_interval=10) + self.assertTrue(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.last_cleaned_at = utcnow() - timedelta(seconds=11) self.assertTrue(worker.should_run_maintenance_tasks) def test_worker_calls_clean_registries(self): @@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase): worker = Worker.find_by_key(w2.key) self.assertEqual(worker.python_version, python_version) + def test_dequeue_random_strategy(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="random") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)] + expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)] + + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + expected_rr.reverse() + expected_ser.reverse() + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + sorted_ids.sort() + expected_ser.sort() + self.assertEqual(sorted_ids, expected_ser) + + def test_dequeue_round_robin(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="round_robin") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0', + 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1', + 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2'] + + self.assertEqual(expected, sorted_ids) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait) @@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) - w.monitor_work_horse(job, queue) + with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked: + w.monitor_work_horse(job, queue) + self.assertEqual(mocked.call_count, 1) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor |