summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorf0cker <f0cker@users.noreply.github.com>2021-01-19 01:19:31 +0000
committerGitHub <noreply@github.com>2021-01-19 08:19:31 +0700
commitefe703214e1015db53d4942398435a8c597d7a2d (patch)
tree111c2f7c6d356373aecdd9786ee03560d8beeafe
parent11c8631921cd9738b94c17937315ec9dba0041b7 (diff)
downloadrq-efe703214e1015db53d4942398435a8c597d7a2d.tar.gz
Added --serializer option to cli, finishing off PR #1381 and fix #1357 (#1395)
* Added --serializer option to cli, finishing off PR #1381 and fix #1357 * Update documentation * Update documentation * Modified help message Co-authored-by: f0cker <dturner@trustwave.com>
-rw-r--r--docs/docs/workers.md12
-rwxr-xr-xrq/cli/cli.py16
-rw-r--r--tests/test_cli.py11
3 files changed, 29 insertions, 10 deletions
diff --git a/docs/docs/workers.md b/docs/docs/workers.md
index 9d2f134..ae89b4a 100644
--- a/docs/docs/workers.md
+++ b/docs/docs/workers.md
@@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute.
+* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer")
## Inside the worker
@@ -205,23 +206,24 @@ workers = Worker.all(queue=queue)
## Worker with Custom Serializer
When creating a worker, you can pass in a custom serializer that will be implicitly passed to the queue.
-Serializers used should have at least `loads` and `dumps` method.
+Serializers used should have at least `loads` and `dumps` method. An example of creating a custom serializer
+class can be found in serializers.py (rq.serializers.JSONSerializer).
The default serializer used is `pickle`
```python
-import json
from rq import Worker
+from rq.serialzers import JSONSerializer
-job = Worker('foo', serializer=json)
+job = Worker('foo', serializer=JSONSerializer)
```
or when creating from a queue
```python
-import json
from rq import Queue, Worker
+from rq.serialzers import JSONSerializer
-w = Worker(Queue('foo'), serializer=json)
+w = Queue('foo', serializer=JSONSerializer)
```
Queues will now use custom serializer
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index f373b9e..95e8f3f 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -25,11 +25,13 @@ from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries
from rq.utils import import_attribute
+from rq.serializers import DefaultSerializer
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
from rq.worker_registration import clean_worker_registry
+
# Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
@@ -62,7 +64,10 @@ shared_options = [
click.option('--path', '-P',
default='.',
help='Specify the import path.',
- multiple=True)
+ multiple=True),
+ click.option('--serializer', '-S',
+ default=DefaultSerializer,
+ help='Path to serializer, defaults to rq.serializers.DefaultSerializer')
]
@@ -117,7 +122,7 @@ def empty(cli_config, all, queues, **options):
@click.option('--queue', required=True, type=str)
@click.argument('job_ids', nargs=-1)
@pass_cli_config
-def requeue(cli_config, queue, all, job_class, job_ids, **options):
+def requeue(cli_config, queue, all, job_class, job_ids, **options):
"""Requeue failed jobs."""
failed_job_registry = FailedJobRegistry(queue,
@@ -203,13 +208,14 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
+@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1)
@pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl,
worker_ttl, job_monitoring_interval, disable_job_desc_logging,
verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn,
exception_handler, pid, disable_default_exception_handler, max_jobs,
- with_scheduler, queues, log_format, date_format, **options):
+ with_scheduler, queues, log_format, date_format, serializer, **options):
"""Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
@@ -226,7 +232,6 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try:
-
cleanup_ghosts(cli_config.connection)
exception_handlers = []
for h in exception_handler:
@@ -247,7 +252,8 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
job_class=cli_config.job_class, queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler,
- log_job_description=not disable_job_desc_logging
+ log_job_description=not disable_job_desc_logging,
+ serializer=serializer
)
# Should we configure Sentry?
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 51d4e60..8c5aa62 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -12,6 +12,7 @@ from rq.cli import main
from rq.cli.helpers import read_config_file, CliConfig
from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
+from rq.serializers import JSONSerializer
from rq.worker import Worker, WorkerStatus
import pytest
@@ -346,3 +347,13 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output)
+
+ def test_serializer(self):
+ """rq worker -u <url> --serializer <serializer>"""
+ connection = Redis.from_url(self.redis_url)
+ q = Queue('default', connection=connection, serializer=JSONSerializer)
+ runner = CliRunner()
+ job = q.enqueue(say_hello)
+ runner.invoke(main, ['worker', '-u', self.redis_url,
+ '--serializer rq.serializer.JSONSerializer'])
+ self.assertIn(job.id, q.job_ids)