summaryrefslogtreecommitdiff
path: root/rq/command.py
blob: 0f8ef6e8bb1cd5a764992841776d08dd06f71099 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import json
import os
import signal

from typing import TYPE_CHECKING, Dict, Any

if TYPE_CHECKING:
    from redis import Redis
    from .worker import Worker

from rq.exceptions import InvalidJobOperation
from rq.job import Job


PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'


def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs):
    """
    Sends a command to a worker.
    A command is just a string, availble commands are:
        - `shutdown`: Shuts down a worker
        - `kill-horse`: Command for the worker to kill the current working horse
        - `stop-job`: A command for the worker to stop the currently running job
    
    The command string will be parsed into a dictionary and send to a PubSub Topic.
    Workers listen to the PubSub, and `handle` the specific command.

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    """
    payload = {'command': command}
    if kwargs:
        payload.update(kwargs)
    connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))


def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]:
    """
    Returns a dict of command data

    Args:
        payload (dict): Parses the payload dict.
    """
    return json.loads(payload.get('data').decode())


def send_shutdown_command(connection: 'Redis', worker_name: str):
    """
    Sends a command to shutdown a worker.

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    """
    send_command(connection, worker_name, 'shutdown')


def send_kill_horse_command(connection: 'Redis', worker_name: str):
    """
    Tell worker to kill it's horse

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    """
    send_command(connection, worker_name, 'kill-horse')


def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
    """
    Instruct a worker to stop a job

    Args:
        connection (Redis): A Redis Connection
        job_id (str): The Job ID
        serializer (): The serializer
    """
    job = Job.fetch(job_id, connection=connection, serializer=serializer)
    if not job.worker_name:
        raise InvalidJobOperation('Job is not currently executing')
    send_command(connection, job.worker_name, 'stop-job', job_id=job_id)


def handle_command(worker: 'Worker', payload: Dict[Any, Any]):
    """Parses payload and routes commands to the worker.

    Args:
        worker (Worker): The worker to use
        payload (Dict[Any, Any]): The Payload
    """
    if payload['command'] == 'stop-job':
        handle_stop_job_command(worker, payload)
    elif payload['command'] == 'shutdown':
        handle_shutdown_command(worker)
    elif payload['command'] == 'kill-horse':
        handle_kill_worker_command(worker, payload)


def handle_shutdown_command(worker: 'Worker'):
    """Perform shutdown command.

    Args:
        worker (Worker): The worker to use.
    """
    worker.log.info('Received shutdown command, sending SIGINT signal.')
    pid = os.getpid()
    os.kill(pid, signal.SIGINT)


def handle_kill_worker_command(worker: 'Worker', payload: Dict[Any, Any]):
    """
    Stops work horse

    Args:
        worker (Worker): The worker to stop
        payload (Dict[Any, Any]): The payload.
    """

    worker.log.info('Received kill horse command.')
    if worker.horse_pid:
        worker.log.info('Kiling horse...')
        worker.kill_horse()
    else:
        worker.log.info('Worker is not working, kill horse command ignored')


def handle_stop_job_command(worker: 'Worker', payload: Dict[Any, Any]):
    """Handles stop job command.

    Args:
        worker (Worker): The worker to use
        payload (Dict[Any, Any]): The payload.
    """
    job_id = payload.get('job_id')
    worker.log.debug('Received command to stop job %s', job_id)
    if job_id and worker.get_current_job_id() == job_id:
        # Sets the '_stopped_job_id' so that the job failure handler knows it
        # was intentional.
        worker._stopped_job_id = job_id
        worker.kill_horse()
    else:
        worker.log.info('Not working on job %s, command ignored.', job_id)