summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
committerSelwin Ong <selwin.ong@gmail.com>2023-03-23 11:23:50 +0700
commita4a74ee377b97b8b7ce57b020c45b21ede56cff2 (patch)
treeef4c0e7c1c62f485f00f7b85277beecaf515c120 /tests
parent63ed6216043ba2b5c3d18703d9945b0ecb25259e (diff)
parent04722339d7598ff0c52f11c3680ed2dd922e6768 (diff)
downloadrq-watcher.tar.gz
Merge branch 'master' of github.com:rq/rq into watcherwatcher
Diffstat (limited to 'tests')
-rw-r--r--tests/test_cli.py34
-rw-r--r--tests/test_connection.py33
-rw-r--r--tests/test_dependencies.py31
-rw-r--r--tests/test_helpers.py48
-rw-r--r--tests/test_job.py33
-rw-r--r--tests/test_registry.py9
-rw-r--r--tests/test_results.py22
-rw-r--r--tests/test_timeouts.py7
-rw-r--r--tests/test_worker.py95
9 files changed, 270 insertions, 42 deletions
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 07b9c39..daa118b 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -5,6 +5,7 @@ from uuid import uuid4
import os
import json
+from click import BadParameter
from click.testing import CliRunner
from redis import Redis
@@ -14,6 +15,7 @@ from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, pars
from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
from rq.serializers import JSONSerializer
+from rq.timeouts import UnixSignalDeathPenalty
from rq.worker import Worker, WorkerStatus
from rq.scheduler import RQScheduler
@@ -118,6 +120,23 @@ class TestRQCli(RQTestCase):
'testhost.example.com',
)
+ def test_death_penalty_class(self):
+ cli_config = CliConfig()
+
+ self.assertEqual(
+ UnixSignalDeathPenalty,
+ cli_config.death_penalty_class
+ )
+
+ cli_config = CliConfig(death_penalty_class='rq.job.Job')
+ self.assertEqual(
+ Job,
+ cli_config.death_penalty_class
+ )
+
+ with self.assertRaises(BadParameter):
+ CliConfig(death_penalty_class='rq.abcd')
+
def test_empty_nothing(self):
"""rq empty -u <url>"""
runner = CliRunner()
@@ -326,6 +345,21 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0)
+ def test_worker_dequeue_strategy(self):
+ """--quiet and --verbose logging options are supported"""
+ runner = CliRunner()
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin']
+ result = runner.invoke(main, args)
+ self.assert_normal_execution(result)
+
+ args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong']
+ result = runner.invoke(main, args)
+ self.assertEqual(result.exit_code, 1)
+
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
connection = Redis.from_url(self.redis_url)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index fdfafbd..393c20d 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -1,9 +1,7 @@
from redis import Redis
-from rq import Connection, Queue, use_connection, get_current_connection, pop_connection
-from rq.connections import NoRedisConnectionException
-
-from tests import find_empty_redis_database, RQTestCase
+from rq import Connection, Queue
+from tests import RQTestCase, find_empty_redis_database
from tests.fixtures import do_nothing
@@ -37,30 +35,3 @@ class TestConnectionInheritance(RQTestCase):
job2 = q2.enqueue(do_nothing)
self.assertEqual(q1.connection, job1.connection)
self.assertEqual(q2.connection, job2.connection)
-
-
-class TestConnectionHelpers(RQTestCase):
- def test_use_connection(self):
- """Test function use_connection works as expected."""
- conn = new_connection()
- use_connection(conn)
-
- self.assertEqual(conn, get_current_connection())
-
- use_connection()
-
- self.assertNotEqual(conn, get_current_connection())
-
- use_connection(self.testconn) # Restore RQTestCase connection
-
- with self.assertRaises(AssertionError):
- with Connection(new_connection()):
- use_connection()
- with Connection(new_connection()):
- use_connection()
-
- def test_resolve_connection_raises_on_no_connection(self):
- """Test function resolve_connection raises if there is no connection."""
- pop_connection()
- with self.assertRaises(NoRedisConnectionException):
- Queue()
diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py
index 26b115d..a290a87 100644
--- a/tests/test_dependencies.py
+++ b/tests/test_dependencies.py
@@ -117,6 +117,37 @@ class TestDependencies(RQTestCase):
self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"])
+ def test_dependency_list_in_depends_on(self):
+ """Enqueue with Dependency list in depends_on"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job1 = q.enqueue(say_hello)
+ parent_job2 = q.enqueue(say_hello)
+ job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])])
+ w.work(burst=True)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+
+ def test_enqueue_job_dependency(self):
+ """Enqueue via Queue.enqueue_job() with depencency"""
+ q = Queue(connection=self.testconn)
+ w = SimpleWorker([q], connection=q.connection)
+
+ # enqueue dependent job when parent successfully finishes
+ parent_job = Job.create(say_hello)
+ parent_job.save()
+ job = Job.create(say_hello, depends_on=parent_job)
+ q.enqueue_job(job)
+ w.work(burst=True)
+ self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+ q.enqueue_job(parent_job)
+ w.work(burst=True)
+ self.assertEqual(parent_job.get_status(), JobStatus.FINISHED)
+ self.assertEqual(job.get_status(), JobStatus.FINISHED)
+
+
def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn)
diff --git a/tests/test_helpers.py b/tests/test_helpers.py
index b43f13b..5a84f71 100644
--- a/tests/test_helpers.py
+++ b/tests/test_helpers.py
@@ -1,11 +1,12 @@
from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
-
+from unittest import mock
class TestHelpers(RQTestCase):
- def test_get_redis_from_config(self):
+ @mock.patch('rq.cli.helpers.Sentinel')
+ def test_get_redis_from_config(self, sentinel_class_mock):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
@@ -39,3 +40,46 @@ class TestHelpers(RQTestCase):
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')
+
+ # Add Sentinel to the settings
+ settings.update({
+ 'SENTINEL': {
+ 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
+ 'MASTER_NAME': 'master',
+ 'DB': 2,
+ 'USERNAME': 'redis-user',
+ 'PASSWORD': 'redis-secret',
+ 'SOCKET_TIMEOUT': None,
+ 'CONNECTION_KWARGS': {
+ 'ssl_ca_path': None,
+ },
+ 'SENTINEL_KWARGS': {
+ 'username': 'sentinel-user',
+ 'password': 'sentinel-secret',
+ },
+ },
+ })
+
+ # Ensure SENTINEL is preferred against REDIS_* parameters
+ redis = get_redis_from_config(settings)
+ sentinel_init_sentinels_args = sentinel_class_mock.call_args[0]
+ sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1]
+ self.assertEqual(
+ sentinel_init_sentinels_args,
+ ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],)
+ )
+ self.assertDictEqual(
+ sentinel_init_sentinel_kwargs,
+ {
+ 'db': 2,
+ 'ssl': False,
+ 'username': 'redis-user',
+ 'password': 'redis-secret',
+ 'socket_timeout': None,
+ 'ssl_ca_path': None,
+ 'sentinel_kwargs': {
+ 'username': 'sentinel-user',
+ 'password': 'sentinel-secret',
+ }
+ }
+ )
diff --git a/tests/test_job.py b/tests/test_job.py
index 9d1ceae..23bbd11 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -1,4 +1,6 @@
import json
+
+from rq.defaults import CALLBACK_TIMEOUT
from rq.serializers import JSONSerializer
import time
import queue
@@ -9,7 +11,7 @@ from redis import WatchError
from rq.utils import as_text
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
-from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job
+from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback
from rq.queue import Queue
from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
@@ -209,9 +211,9 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
- set(self.testconn.hkeys(job.key)),
{b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at',
- b'worker_name', b'success_callback_name', b'failure_callback_name'}
+ b'worker_name', b'success_callback_name', b'failure_callback_name'},
+ set(self.testconn.hkeys(job.key))
)
self.assertEqual(job.last_heartbeat, None)
@@ -241,6 +243,31 @@ class TestJob(RQTestCase):
self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job)
+ def test_persistence_of_callbacks(self):
+ """Storing jobs with success and/or failure callbacks."""
+ job = Job.create(func=fixtures.some_calculation,
+ on_success=Callback(fixtures.say_hello, timeout=10),
+ on_failure=fixtures.say_pid) # deprecated callable
+ job.save()
+ stored_job = Job.fetch(job.id)
+
+ self.assertEqual(fixtures.say_hello, stored_job.success_callback)
+ self.assertEqual(10, stored_job.success_callback_timeout)
+ self.assertEqual(fixtures.say_pid, stored_job.failure_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
+
+ # None(s)
+ job = Job.create(func=fixtures.some_calculation,
+ on_failure=None)
+ job.save()
+ stored_job = Job.fetch(job.id)
+ self.assertIsNone(stored_job.success_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, job.success_callback_timeout) # timeout should be never none
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.success_callback_timeout)
+ self.assertIsNone(stored_job.failure_callback)
+ self.assertEqual(CALLBACK_TIMEOUT, job.failure_callback_timeout) # timeout should be never none
+ self.assertEqual(CALLBACK_TIMEOUT, stored_job.failure_callback_timeout)
+
def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4),
diff --git a/tests/test_registry.py b/tests/test_registry.py
index 28a29ca..57584b5 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -1,9 +1,12 @@
from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import PropertyMock, ANY
+
from rq.serializers import JSONSerializer
from rq.utils import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
-from rq.exceptions import InvalidJobOperation
+from rq.exceptions import InvalidJobOperation, AbandonedJobError
from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue
from rq.utils import current_timestamp
@@ -161,7 +164,9 @@ class TestRegistry(RQTestCase):
self.assertNotIn(job, failed_job_registry)
self.assertIn(job, self.registry)
- self.registry.cleanup()
+ with mock.patch.object(Job, 'execute_failure_callback') as mocked:
+ self.registry.cleanup()
+ mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY)
self.assertIn(job.id, failed_job_registry)
self.assertNotIn(job, self.registry)
job.refresh()
diff --git a/tests/test_results.py b/tests/test_results.py
index 4f705f5..9bc1b9e 100644
--- a/tests/test_results.py
+++ b/tests/test_results.py
@@ -214,3 +214,25 @@ class TestScheduledJobRegistry(RQTestCase):
job = queue.enqueue(div_by_zero)
self.assertEqual(job.latest_result().type, Result.Type.FAILED)
+
+ def test_job_return_value_result_ttl_infinity(self):
+ """Test job.return_value when queue.result_ttl=-1"""
+ queue = Queue(connection=self.connection, result_ttl=-1)
+ job = queue.enqueue(say_hello)
+
+ # Returns None when there's no result
+ self.assertIsNone(job.return_value())
+
+ Result.create(job, Result.Type.SUCCESSFUL, ttl=-1, return_value=1)
+ self.assertEqual(job.return_value(), 1)
+
+ def test_job_return_value_result_ttl_zero(self):
+ """Test job.return_value when queue.result_ttl=0"""
+ queue = Queue(connection=self.connection, result_ttl=0)
+ job = queue.enqueue(say_hello)
+
+ # Returns None when there's no result
+ self.assertIsNone(job.return_value())
+
+ Result.create(job, Result.Type.SUCCESSFUL, ttl=0, return_value=1)
+ self.assertIsNone(job.return_value())
diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py
index 2872ee0..1f392a3 100644
--- a/tests/test_timeouts.py
+++ b/tests/test_timeouts.py
@@ -42,3 +42,10 @@ class TestTimeouts(RQTestCase):
self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
+
+ # Test negative timeout doesn't raise JobTimeoutException,
+ # which implies an unintended immediate timeout.
+ job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=-1)
+ w.work(burst=True)
+ job.refresh()
+ self.assertIn(job, finished_job_registry)
diff --git a/tests/test_worker.py b/tests/test_worker.py
index cfce473..dfa0f1d 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -19,6 +19,7 @@ import pytest
from unittest import mock
from unittest.mock import Mock
+from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, slow
from tests.fixtures import (
access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
@@ -607,6 +608,31 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
+ @slow
+ def test_max_idle_time(self):
+ q = Queue()
+ w = Worker([q])
+ q.enqueue(say_hello, args=('Frank',))
+ self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1))
+
+ # idle for 1 second
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1))
+
+ # idle for 3 seconds
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
+ # idle for 2 seconds because idle_time is less than timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
+ self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer
+
+ # idle for 3 seconds because idle_time is less than two rounds of timeout
+ now = utcnow()
+ self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3))
+ self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
+
@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
@@ -639,7 +665,6 @@ class TestWorker(RQTestCase):
q = Queue()
w = Worker([q])
- # Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self):
@@ -936,7 +961,15 @@ class TestWorker(RQTestCase):
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
- worker.last_cleaned_at = utcnow() - timedelta(seconds=3700)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+
+ # custom maintenance_interval
+ worker = Worker(queue, maintenance_interval=10)
+ self.assertTrue(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow()
+ self.assertFalse(worker.should_run_maintenance_tasks)
+ worker.last_cleaned_at = utcnow() - timedelta(seconds=11)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
@@ -1103,6 +1136,59 @@ class TestWorker(RQTestCase):
worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version)
+ def test_dequeue_random_strategy(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="random")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
+ expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
+
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ expected_rr.reverse()
+ expected_ser.reverse()
+ self.assertNotEqual(sorted_ids, expected_rr)
+ self.assertNotEqual(sorted_ids, expected_ser)
+ sorted_ids.sort()
+ expected_ser.sort()
+ self.assertEqual(sorted_ids, expected_ser)
+
+ def test_dequeue_round_robin(self):
+ qs = [Queue('q%d' % i) for i in range(5)]
+
+ for i in range(5):
+ for j in range(3):
+ qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
+
+ w = Worker(qs)
+ w.work(burst=True, dequeue_strategy="round_robin")
+
+ start_times = []
+ for i in range(5):
+ for j in range(3):
+ job = Job.fetch('q%d_%d' % (i, j))
+ start_times.append(('q%d_%d' % (i, j), job.started_at))
+ sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
+ sorted_ids = [tup[0] for tup in sorted_by_time]
+ expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
+ 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
+ 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
+
+ self.assertEqual(expected, sorted_ids)
+
def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
@@ -1224,13 +1310,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
-
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
- w.monitor_work_horse(job, queue)
+ with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked:
+ w.monitor_work_horse(job, queue)
+ self.assertEqual(mocked.call_count, 1)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor