summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2020-11-14 19:59:20 +0700
committerGitHub <noreply@github.com>2020-11-14 19:59:20 +0700
commit492e77d86df03ce343167a126cbf99f79a34c173 (patch)
treee36bff09d1ec6581042d00a0f76091ed75d001ff
parentf3e924cdd160cb99f138782b0c4a67620184e0a2 (diff)
downloadrq-492e77d86df03ce343167a126cbf99f79a34c173.tar.gz
send_stop_job_command (#1376)
* Added send_stop_job_command(). * send_stop_job_command now accepts just connection and job_id * Document send_job_job_command * Updated test coverage
-rw-r--r--docs/docs/workers.md13
-rw-r--r--rq/command.py66
-rw-r--r--rq/job.py3
-rw-r--r--rq/worker.py16
-rw-r--r--tests/test_commands.py56
5 files changed, 132 insertions, 22 deletions
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index 950565d..3e8ee4d 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -409,3 +409,16 @@ for worker in workers:
if worker.state = WorkerStatus.BUSY:
send_kill_horse_command(redis, worker.name)
```
+
+_New in version 1.7.0._
+* `send_stop_job_command()`: tells worker to stop a job.
+
+```python
+from redis import Redis
+from rq.command import send_stop_job_command
+
+redis = Redis()
+
+# This will raise an exception if job is invalid or not currently executing
+send_stop_job_command(redis, job_id)
+```
diff --git a/rq/command.py b/rq/command.py
index be8fea8..94eed96 100644
--- a/rq/command.py
+++ b/rq/command.py
@@ -1,13 +1,20 @@
import json
+import os
+import signal
+
+from rq.exceptions import InvalidJobOperation
+from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
-def send_command(redis, worker_name, command):
- """Use Redis' pubsub mechanism to send a command"""
+def send_command(connection, worker_name, command, **kwargs):
+ """Use connection' pubsub mechanism to send a command"""
payload = {'command': command}
- redis.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
+ if kwargs:
+ payload.update(kwargs)
+ connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload):
@@ -15,11 +22,56 @@ def parse_payload(payload):
return json.loads(payload.get('data').decode())
-def send_shutdown_command(redis, worker_name):
+def send_shutdown_command(connection, worker_name):
"""Send shutdown command"""
- send_command(redis, worker_name, 'shutdown')
+ send_command(connection, worker_name, 'shutdown')
-def send_kill_horse_command(redis, worker_name):
+def send_kill_horse_command(connection, worker_name):
"""Tell worker to kill it's horse"""
- send_command(redis, worker_name, 'kill-horse')
+ send_command(connection, worker_name, 'kill-horse')
+
+
+def send_stop_job_command(connection, job_id):
+ """Instruct a worker to stop a job"""
+ job = Job.fetch(job_id, connection=connection)
+ if not job.worker_name:
+ raise InvalidJobOperation('Job is not currently executing')
+ send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
+
+
+def handle_command(worker, payload):
+ """Parses payload and routes commands"""
+ if payload['command'] == 'stop-job':
+ handle_stop_job_command(worker, payload)
+ elif payload['command'] == 'shutdown':
+ handle_shutdown_command(worker)
+ elif payload['command'] == 'kill-horse':
+ handle_kill_worker_command(worker, payload)
+
+
+def handle_shutdown_command(worker):
+ """Perform shutdown command"""
+ worker.log.info('Received shutdown command, sending SIGINT signal.')
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+
+
+def handle_kill_worker_command(worker, payload):
+ """Stops work horse"""
+ worker.log.info('Received kill horse command.')
+ if worker.horse_pid:
+ worker.log.info('Kiling horse...')
+ worker.kill_horse()
+ else:
+ worker.log.info('Worker is not working, kill horse command ignored')
+
+
+def handle_stop_job_command(worker, payload):
+ """Handles stop job command"""
+ job_id = payload.get('job_id')
+ worker.log.debug('Received command to stop job %s', job_id)
+ if job_id and worker.get_current_job_id() == job_id:
+ worker.kill_horse()
+ else:
+ worker.log.info('Not working on job %s, command ignored.', job_id)
diff --git a/rq/job.py b/rq/job.py
index 0f16d3f..eb3ca45 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -45,7 +45,8 @@ def truncate_long_string(data, maxlen=75):
""" Truncates strings longer than maxlen
"""
return (data[:maxlen] + '...') if len(data) > maxlen else data
-
+
+
def cancel_job(job_id, connection=None):
"""Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later).
diff --git a/rq/worker.py b/rq/worker.py
index cbe7073..ad5d0ae 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -25,7 +25,7 @@ except ImportError:
from redis import WatchError
from . import worker_registration
-from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE
+from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .compat import as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection
@@ -1071,18 +1071,10 @@ class Worker(object):
return False
def handle_payload(self, message):
+ """Handle external commands"""
+ self.log.debug('Received message: %s', message)
payload = parse_payload(message)
- if payload['command'] == 'shutdown':
- self.log.info('Received shutdown command, sending SIGINT signal.')
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- elif payload['command'] == 'kill-horse':
- self.log.info('Received kill horse command.')
- if self.horse_pid:
- self.log.info('Kiling horse...')
- self.kill_horse()
- else:
- self.log.info('Worker is not working, ignoring kill horse command')
+ handle_command(self, payload)
class SimpleWorker(Worker):
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 647a17a..a7a0dd6 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -6,7 +6,9 @@ from tests import RQTestCase
from tests.fixtures import long_running_job
from rq import Queue, Worker
-from rq.command import send_command, send_kill_horse_command, send_shutdown_command
+from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command
+from rq.exceptions import InvalidJobOperation, NoSuchJobError
+from rq.worker import WorkerStatus
class TestCommands(RQTestCase):
@@ -42,4 +44,54 @@ class TestCommands(RQTestCase):
worker.work(burst=True)
p.join(1)
job.refresh()
- self.assertTrue(job.id in queue.failed_job_registry) \ No newline at end of file
+ self.assertTrue(job.id in queue.failed_job_registry)
+
+ def start_work():
+ worker.work()
+
+ p = Process(target=start_work)
+ p.start()
+ p.join(2)
+
+ send_kill_horse_command(connection, worker.name)
+ worker.refresh()
+ # Since worker is not busy, command will be ignored
+ self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
+ send_shutdown_command(connection, worker.name)
+
+ def test_stop_job_command(self):
+ """Ensure that stop_job command works properly."""
+
+ connection = self.testconn
+ queue = Queue('foo', connection=connection)
+ job = queue.enqueue(long_running_job, 3)
+ worker = Worker('foo', connection=connection)
+
+ # If job is not executing, an error is raised
+ with self.assertRaises(InvalidJobOperation):
+ send_stop_job_command(connection, job_id=job.id)
+
+ # An exception is raised if job ID is invalid
+ with self.assertRaises(NoSuchJobError):
+ send_stop_job_command(connection, job_id='1')
+
+ def start_work():
+ worker.work(burst=True)
+
+ p = Process(target=start_work)
+ p.start()
+ p.join(1)
+
+ time.sleep(0.1)
+
+ send_command(connection, worker.name, 'stop-job', job_id=1)
+ time.sleep(0.25)
+ # Worker still working due to job_id mismatch
+ worker.refresh()
+ self.assertEqual(worker.get_state(), WorkerStatus.BUSY)
+
+ send_stop_job_command(connection, job_id=job.id)
+ time.sleep(0.25)
+ # Worker has stopped working
+ worker.refresh()
+ self.assertEqual(worker.get_state(), WorkerStatus.IDLE)