summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-10-22 07:26:24 +0700
committerGitHub <noreply@github.com>2020-10-22 07:26:24 +0700
commita721db34b171fd38eb262c858cc5ca32cc954bfb (patch)
treeebf84a4dd249310db52a03e1c5dedec700604a0a
parent0e65bab10b1dc9040c98b7d2bf108f3279c84de5 (diff)
downloadrq-a721db34b171fd38eb262c858cc5ca32cc954bfb.tar.gz
Workers can listen to external commands via pubsub (#1363)
* Added a way to send shutdown command via pubsub * Added kill-horse command * Added kill horse command * Added send_kill_horse_command() and send_shutdown_command() * Document worker commands
-rw-r--r--docs/docs/workers.md38
-rw-r--r--rq/command.py25
-rw-r--r--rq/worker.py54
-rw-r--r--tests/test_commands.py45
4 files changed, 157 insertions, 5 deletions
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index 25d0095..950565d 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -371,3 +371,41 @@ If you want to disable RQ's default exception handler, use the `--disable-defaul
```console
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler
```
+
+
+## Sending Commands to Worker
+_New in version 1.6.0._
+
+Starting in version 1.6.0, workers use Redis' pubsub mechanism to listen to external commands while
+they're working. Two commands are currently implemented:
+
+* `send_shutdown_command()`: sends shutdown command to worker. This is similar to sending a SIGINT
+signal to a worker.
+
+```python
+from redis import Redis
+from rq.command import send_shutdown_command
+from rq.worker import Worker
+
+redis = Redis()
+
+workers = Worker.all(redis)
+for worker in workers:
+ send_shutdown_command(redis, worker.name) # Tells worker to shutdown
+```
+
+* `send_kill_horse_command()`: tells a worker to cancel a currently executing job. If worker is
+not currently working, this command will be ignored.
+
+```python
+from redis import Redis
+from rq.command import send_kill_horse_command
+from rq.worker import Worker, WorkerStatus
+
+redis = Redis()
+
+workers = Worker.all(redis)
+for worker in workers:
+ if worker.state = WorkerStatus.BUSY:
+ send_kill_horse_command(redis, worker.name)
+```
diff --git a/rq/command.py b/rq/command.py
new file mode 100644
index 0000000..be8fea8
--- /dev/null
+++ b/rq/command.py
@@ -0,0 +1,25 @@
+import json
+
+
+PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
+
+
+def send_command(redis, worker_name, command):
+ """Use Redis' pubsub mechanism to send a command"""
+ payload = {'command': command}
+ redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
+
+
+def parse_payload(payload):
+ """Returns a dict of command data"""
+ return json.loads(payload.get('data').decode())
+
+
+def send_shutdown_command(redis, worker_name):
+ """Send shutdown command"""
+ send_command(redis, worker_name, 'shutdown')
+
+
+def send_kill_horse_command(redis, worker_name):
+ """Tell worker to kill it's horse"""
+ send_command(redis, worker_name, 'kill-horse')
diff --git a/rq/worker.py b/rq/worker.py
index 88c72b0..13ed298 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import errno
+import json
import logging
import os
import random
@@ -25,6 +26,7 @@ except ImportError:
from redis import WatchError
from . import worker_registration
+from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE
from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
@@ -211,6 +213,8 @@ class Worker(object):
self.total_working_time = 0
self.birth_date = None
self.scheduler = None
+ self.pubsub = None
+ self.pubsub_thread = None
self.disable_default_exception_handler = disable_default_exception_handler
@@ -246,6 +250,11 @@ class Worker(object):
return self.redis_worker_namespace_prefix + self.name
@property
+ def pubsub_channel_name(self):
+ """Returns the worker's Redis hash key."""
+ return PUBSUB_CHANNEL_TEMPLATE % self.name
+
+ @property
def horse_pid(self):
"""The horse's process ID. Only available in the worker. Will return
0 in the horse part of the fork.
@@ -385,7 +394,6 @@ class Worker(object):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
"""
-
signal.signal(signal.SIGINT, self.request_stop)
signal.signal(signal.SIGTERM, self.request_stop)
@@ -438,9 +446,13 @@ class Worker(object):
signal.signal(signal.SIGTERM, self.request_force_stop)
self.handle_warm_shutdown_request()
-
- # If shutdown is requested in the middle of a job, wait until
- # finish before shutting down and save the request in redis
+ self._shutdown()
+
+ def _shutdown(self):
+ """
+ If shutdown is requested in the middle of a job, wait until
+ finish before shutting down and save the request in redis
+ """
if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True
self.set_shutdown_requested_date()
@@ -492,6 +504,22 @@ class Worker(object):
if self.scheduler and not self.scheduler._process:
self.scheduler.acquire_locks(auto_start=True)
self.clean_registries()
+
+ def subscribe(self):
+ """Subscribe to this worker's channel"""
+ self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
+ self.pubsub = self.connection.pubsub()
+ self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload})
+ self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=0.2)
+
+ def unsubscribe(self):
+ """Unsubscribe from pubsub channel"""
+ if self.pubsub_thread:
+ self.log.info('Unsubscribing from channel %s', self.pubsub_channel_name)
+ self.pubsub_thread.stop()
+ self.pubsub_thread.join()
+ self.pubsub.unsubscribe()
+ self.pubsub.close()
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
@@ -507,6 +535,7 @@ class Worker(object):
completed_jobs = 0
self.register_birth()
self.log.info("Worker %s: started, version %s", self.key, VERSION)
+ self.subscribe()
self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
@@ -538,7 +567,6 @@ class Worker(object):
break
timeout = None if burst else max(1, self.default_worker_ttl - 15)
-
result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None:
if burst:
@@ -578,6 +606,7 @@ class Worker(object):
self.stop_scheduler()
self.register_death()
+ self.unsubscribe()
return bool(completed_jobs)
def stop_scheduler(self):
@@ -742,6 +771,7 @@ class Worker(object):
# Send a heartbeat to keep the worker alive.
self.heartbeat()
+ self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
@@ -1047,6 +1077,20 @@ class Worker(object):
return True
return False
+ def handle_payload(self, message):
+ payload = parse_payload(message)
+ if payload['command'] == 'shutdown':
+ self.log.info('Received shutdown command, sending SIGINT signal.')
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+ elif payload['command'] == 'kill-horse':
+ self.log.info('Received kill horse command.')
+ if self.horse_pid:
+ self.log.info('Kiling horse...')
+ self.kill_horse()
+ else:
+ self.log.info('Worker is not working, ignoring kill horse command')
+
class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs):
diff --git a/tests/test_commands.py b/tests/test_commands.py
new file mode 100644
index 0000000..647a17a
--- /dev/null
+++ b/tests/test_commands.py
@@ -0,0 +1,45 @@
+import time
+
+from multiprocessing import Process
+
+from tests import RQTestCase
+from tests.fixtures import long_running_job
+
+from rq import Queue, Worker
+from rq.command import send_command, send_kill_horse_command, send_shutdown_command
+
+
+class TestCommands(RQTestCase):
+
+ def test_shutdown_command(self):
+ """Ensure that shutdown command works properly."""
+ connection = self.testconn
+ worker = Worker('foo', connection=connection)
+
+ def _send_shutdown_command():
+ time.sleep(0.25)
+ send_shutdown_command(connection, worker.name)
+
+ p = Process(target=_send_shutdown_command)
+ p.start()
+ worker.work()
+ p.join(1)
+
+ def test_kill_horse_command(self):
+ """Ensure that shutdown command works properly."""
+ connection = self.testconn
+ queue = Queue('foo', connection=connection)
+ job = queue.enqueue(long_running_job, 4)
+ worker = Worker('foo', connection=connection)
+
+ def _send_kill_horse_command():
+ """Waits 0.25 seconds before sending kill-horse command"""
+ time.sleep(0.25)
+ send_kill_horse_command(connection, worker.name)
+
+ p = Process(target=_send_kill_horse_command)
+ p.start()
+ worker.work(burst=True)
+ p.join(1)
+ job.refresh()
+ self.assertTrue(job.id in queue.failed_job_registry) \ No newline at end of file