summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdda Satya Ram <34022496+Asrst@users.noreply.github.com>2021-01-14 13:01:41 +0530
committerGitHub <noreply@github.com>2021-01-14 14:31:41 +0700
commit11c8631921cd9738b94c17937315ec9dba0041b7 (patch)
tree5ced63642c44728b7144cc797f9b01a4bc7c9c79
parent709043989a8d2067e588a1421089c9a7e28fc02a (diff)
downloadrq-11c8631921cd9738b94c17937315ec9dba0041b7.tar.gz
Add exception to catch redis connection failure to retry after wait time (#1387)
* add exception catch for redis connection failure * Add test for connection recovery * add exponential backoff * limit worker max connection wait time to 60 seconds * fix undefined class variable * fix string formatting issue while printing error log * cap max connection wait time:better code style Co-authored-by: corynezin <cory.nezin@gmail.com>
-rw-r--r--rq/worker.py28
-rw-r--r--tests/test_worker.py14
2 files changed, 34 insertions, 8 deletions
diff --git a/rq/worker.py b/rq/worker.py
index 8aa07bd..c5762fd 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -22,7 +22,7 @@ try:
except ImportError:
from signal import SIGTERM as SIGKILL
-from redis import WatchError
+import redis.exceptions
from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
@@ -106,6 +106,10 @@ class Worker(object):
log_result_lifespan = True
# `log_job_description` is used to toggle logging an entire jobs description.
log_job_description = True
+ # factor to increase connection_wait_time incase of continous connection failures.
+ exponential_backoff_factor = 2.0
+ # Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable.
+ max_connection_wait_time = 60.0
@classmethod
def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None):
@@ -469,7 +473,6 @@ class Worker(object):
def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`"""
-
before_state = None
notified = False
@@ -628,14 +631,15 @@ class Worker(object):
self.set_state(WorkerStatus.IDLE)
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))
-
+ connection_wait_time = 1.0
while True:
- self.heartbeat()
-
- if self.should_run_maintenance_tasks:
- self.run_maintenance_tasks()
try:
+ self.heartbeat()
+
+ if self.should_run_maintenance_tasks:
+ self.run_maintenance_tasks()
+
result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection,
job_class=self.job_class,
@@ -654,6 +658,14 @@ class Worker(object):
break
except DequeueTimeout:
pass
+ except redis.exceptions.ConnectionError as conn_err:
+ self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...',
+ conn_err, connection_wait_time)
+ time.sleep(connection_wait_time)
+ connection_wait_time *= self.exponential_backoff_factor
+ connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
+ else:
+ connection_wait_time = 1.0
self.heartbeat()
return result
@@ -955,7 +967,7 @@ class Worker(object):
pipeline.execute()
break
- except WatchError:
+ except redis.exceptions.WatchError:
continue
def perform_job(self, job, queue, heartbeat_ttl=None):
diff --git a/tests/test_worker.py b/tests/test_worker.py
index f30cc1a..1cf2be1 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -18,6 +18,7 @@ from time import sleep
from unittest import skipIf
+import redis.exceptions
import pytest
import mock
from mock import Mock
@@ -284,6 +285,19 @@ class TestWorker(RQTestCase):
w.refresh()
@slow
+ def test_heartbeat_survives_lost_connection(self):
+ with mock.patch.object(Worker, 'heartbeat') as mocked:
+ # None -> Heartbeat is first called before the job loop
+ mocked.side_effect = [None, redis.exceptions.ConnectionError()]
+ q = Queue()
+ w = Worker([q])
+ w.work(burst=True)
+ # First call is prior to job loop, second raises the error,
+ # third is successful, after "recovery"
+ assert mocked.call_count == 3
+
+
+ @slow
def test_heartbeat_busy(self):
"""Periodic heartbeats while horse is busy with long jobs"""
q = Queue()