summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rq/compat/__init__.py61
-rw-r--r--rq/compat/connections.py9
-rw-r--r--rq/compat/dictconfig.py554
-rw-r--r--rq/decorators.py4
-rw-r--r--rq/job.py8
-rw-r--r--rq/queue.py7
-rw-r--r--rq/registry.py2
-rw-r--r--rq/results.py6
-rw-r--r--rq/serializers.py3
-rw-r--r--rq/utils.py20
-rw-r--r--rq/worker.py6
-rw-r--r--rq/worker_registration.py2
-rw-r--r--tests/fixtures.py3
-rw-r--r--tests/test_job.py2
-rw-r--r--tests/test_registry.py2
-rw-r--r--tests/test_worker.py2
16 files changed, 37 insertions, 654 deletions
diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py
deleted file mode 100644
index c2275e2..0000000
--- a/rq/compat/__init__.py
+++ /dev/null
@@ -1,61 +0,0 @@
-import sys
-
-
-def is_python_version(*versions):
- for version in versions:
- if (sys.version_info[0] == version[0] and sys.version_info >= version):
- return True
- return False
-
-
-try:
- from functools import total_ordering
-except ImportError:
- def total_ordering(cls): # noqa
- """Class decorator that fills in missing ordering methods"""
- convert = {
- '__lt__': [('__gt__', lambda self, other: other < self),
- ('__le__', lambda self, other: not other < self),
- ('__ge__', lambda self, other: not self < other)],
- '__le__': [('__ge__', lambda self, other: other <= self),
- ('__lt__', lambda self, other: not other <= self),
- ('__gt__', lambda self, other: not self <= other)],
- '__gt__': [('__lt__', lambda self, other: other > self),
- ('__ge__', lambda self, other: not other > self),
- ('__le__', lambda self, other: not self > other)],
- '__ge__': [('__le__', lambda self, other: other >= self),
- ('__gt__', lambda self, other: not other >= self),
- ('__lt__', lambda self, other: not self >= other)]
- }
- roots = set(dir(cls)) & set(convert)
- if not roots:
- raise ValueError('must define at least one ordering operation: < > <= >=') # noqa
- root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
- for opname, opfunc in convert[root]:
- if opname not in roots:
- opfunc.__name__ = str(opname)
- opfunc.__doc__ = getattr(int, opname).__doc__
- setattr(cls, opname, opfunc)
- return cls
-
-
-PY2 = sys.version_info[0] == 2
-
-# Python 3.x and up
-text_type = str
-string_types = (str,)
-
-
-def as_text(v):
- if v is None:
- return None
- elif isinstance(v, bytes):
- return v.decode('utf-8')
- elif isinstance(v, str):
- return v
- else:
- raise ValueError('Unknown type %r' % type(v))
-
-
-def decode_redis_hash(h):
- return dict((as_text(k), h[k]) for k in h)
diff --git a/rq/compat/connections.py b/rq/compat/connections.py
deleted file mode 100644
index 49b9685..0000000
--- a/rq/compat/connections.py
+++ /dev/null
@@ -1,9 +0,0 @@
-def fix_return_type(func):
- # deliberately no functools.wraps() call here, since the function being
- # wrapped is a partial, which has no module
- def _inner(*args, **kwargs):
- value = func(*args, **kwargs)
- if value is None:
- value = -1
- return value
- return _inner
diff --git a/rq/compat/dictconfig.py b/rq/compat/dictconfig.py
deleted file mode 100644
index c9876da..0000000
--- a/rq/compat/dictconfig.py
+++ /dev/null
@@ -1,554 +0,0 @@
-# flake8: noqa
-# This is a copy of the Python logging.config.dictconfig module. It is
-# provided here for backwards compatibility for Python versions prior to 2.7.
-#
-# Copyright 2009-2010 by Vinay Sajip. All Rights Reserved.
-#
-# Permission to use, copy, modify, and distribute this software and its
-# documentation for any purpose and without fee is hereby granted,
-# provided that the above copyright notice appear in all copies and that
-# both that copyright notice and this permission notice appear in
-# supporting documentation, and that the name of Vinay Sajip
-# not be used in advertising or publicity pertaining to distribution
-# of the software without specific, written prior permission.
-# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
-# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
-# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
-# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
-# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
-# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-import logging.handlers
-import re
-import sys
-import types
-from rq.compat import string_types
-
-IDENTIFIER = re.compile('^[a-z_][a-z0-9_]*$', re.I)
-
-def valid_ident(s):
- m = IDENTIFIER.match(s)
- if not m:
- raise ValueError('Not a valid Python identifier: %r' % s)
- return True
-
-#
-# This function is defined in logging only in recent versions of Python
-#
-try:
- from logging import _checkLevel
-except ImportError:
- def _checkLevel(level):
- if isinstance(level, int):
- rv = level
- elif str(level) == level:
- if level not in logging._levelNames:
- raise ValueError('Unknown level: %r' % level)
- rv = logging._levelNames[level]
- else:
- raise TypeError('Level not an integer or a '
- 'valid string: %r' % level)
- return rv
-
-# The ConvertingXXX classes are wrappers around standard Python containers,
-# and they serve to convert any suitable values in the container. The
-# conversion converts base dicts, lists and tuples to their wrapped
-# equivalents, whereas strings which match a conversion format are converted
-# appropriately.
-#
-# Each wrapper should have a configurator attribute holding the actual
-# configurator to use for conversion.
-
-class ConvertingDict(dict):
- """A converting dictionary wrapper."""
-
- def __getitem__(self, key):
- value = dict.__getitem__(self, key)
- result = self.configurator.convert(value)
- #If the converted value is different, save for next time
- if value is not result:
- self[key] = result
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- result.key = key
- return result
-
- def get(self, key, default=None):
- value = dict.get(self, key, default)
- result = self.configurator.convert(value)
- #If the converted value is different, save for next time
- if value is not result:
- self[key] = result
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- result.key = key
- return result
-
- def pop(self, key, default=None):
- value = dict.pop(self, key, default)
- result = self.configurator.convert(value)
- if value is not result:
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- result.key = key
- return result
-
-class ConvertingList(list):
- """A converting list wrapper."""
- def __getitem__(self, key):
- value = list.__getitem__(self, key)
- result = self.configurator.convert(value)
- #If the converted value is different, save for next time
- if value is not result:
- self[key] = result
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- result.key = key
- return result
-
- def pop(self, idx=-1):
- value = list.pop(self, idx)
- result = self.configurator.convert(value)
- if value is not result:
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- return result
-
-class ConvertingTuple(tuple):
- """A converting tuple wrapper."""
- def __getitem__(self, key):
- value = tuple.__getitem__(self, key)
- result = self.configurator.convert(value)
- if value is not result:
- if type(result) in (ConvertingDict, ConvertingList,
- ConvertingTuple):
- result.parent = self
- result.key = key
- return result
-
-class BaseConfigurator:
- """
- The configurator base class which defines some useful defaults.
- """
-
- CONVERT_PATTERN = re.compile(r'^(?P<prefix>[a-z]+)://(?P<suffix>.*)$')
-
- WORD_PATTERN = re.compile(r'^\s*(\w+)\s*')
- DOT_PATTERN = re.compile(r'^\.\s*(\w+)\s*')
- INDEX_PATTERN = re.compile(r'^\[\s*(\w+)\s*\]\s*')
- DIGIT_PATTERN = re.compile(r'^\d+$')
-
- value_converters = {
- 'ext' : 'ext_convert',
- 'cfg' : 'cfg_convert',
- }
-
- # We might want to use a different one, e.g. importlib
- importer = __import__
-
- def __init__(self, config):
- self.config = ConvertingDict(config)
- self.config.configurator = self
-
- def resolve(self, s):
- """
- Resolve strings to objects using standard import and attribute
- syntax.
- """
- name = s.split('.')
- used = name.pop(0)
- try:
- found = self.importer(used)
- for frag in name:
- used += '.' + frag
- try:
- found = getattr(found, frag)
- except AttributeError:
- self.importer(used)
- found = getattr(found, frag)
- return found
- except ImportError:
- e, tb = sys.exc_info()[1:]
- v = ValueError('Cannot resolve %r: %s' % (s, e))
- v.__cause__, v.__traceback__ = e, tb
- raise v
-
- def ext_convert(self, value):
- """Default converter for the ext:// protocol."""
- return self.resolve(value)
-
- def cfg_convert(self, value):
- """Default converter for the cfg:// protocol."""
- rest = value
- m = self.WORD_PATTERN.match(rest)
- if m is None:
- raise ValueError("Unable to convert %r" % value)
- else:
- rest = rest[m.end():]
- d = self.config[m.groups()[0]]
- #print d, rest
- while rest:
- m = self.DOT_PATTERN.match(rest)
- if m:
- d = d[m.groups()[0]]
- else:
- m = self.INDEX_PATTERN.match(rest)
- if m:
- idx = m.groups()[0]
- if not self.DIGIT_PATTERN.match(idx):
- d = d[idx]
- else:
- try:
- n = int(idx) # try as number first (most likely)
- d = d[n]
- except TypeError:
- d = d[idx]
- if m:
- rest = rest[m.end():]
- else:
- raise ValueError('Unable to convert '
- '%r at %r' % (value, rest))
- #rest should be empty
- return d
-
- def convert(self, value):
- """
- Convert values to an appropriate type. dicts, lists and tuples are
- replaced by their converting alternatives. Strings are checked to
- see if they have a conversion format and are converted if they do.
- """
- if not isinstance(value, ConvertingDict) and isinstance(value, dict):
- value = ConvertingDict(value)
- value.configurator = self
- elif not isinstance(value, ConvertingList) and isinstance(value, list):
- value = ConvertingList(value)
- value.configurator = self
- elif not isinstance(value, ConvertingTuple) and\
- isinstance(value, tuple):
- value = ConvertingTuple(value)
- value.configurator = self
- elif isinstance(value, string_types): # str for py3k
- m = self.CONVERT_PATTERN.match(value)
- if m:
- d = m.groupdict()
- prefix = d['prefix']
- converter = self.value_converters.get(prefix, None)
- if converter:
- suffix = d['suffix']
- converter = getattr(self, converter)
- value = converter(suffix)
- return value
-
- def configure_custom(self, config):
- """Configure an object with a user-supplied factory."""
- c = config.pop('()')
- if not hasattr(c, '__call__') and type(c) != type:
- c = self.resolve(c)
- props = config.pop('.', None)
- # Check for valid identifiers
- kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
- result = c(**kwargs)
- if props:
- for name, value in props.items():
- setattr(result, name, value)
- return result
-
- def as_tuple(self, value):
- """Utility function which converts lists to tuples."""
- if isinstance(value, list):
- value = tuple(value)
- return value
-
-class DictConfigurator(BaseConfigurator):
- """
- Configure logging using a dictionary-like object to describe the
- configuration.
- """
-
- def configure(self):
- """Do the configuration."""
-
- config = self.config
- if 'version' not in config:
- raise ValueError("dictionary doesn't specify a version")
- if config['version'] != 1:
- raise ValueError("Unsupported version: %s" % config['version'])
- incremental = config.pop('incremental', False)
- EMPTY_DICT = {}
- logging._acquireLock()
- try:
- if incremental:
- handlers = config.get('handlers', EMPTY_DICT)
- # incremental handler config only if handler name
- # ties in to logging._handlers (Python 2.7)
- if sys.version_info[:2] == (2, 7):
- for name in handlers:
- if name not in logging._handlers:
- raise ValueError('No handler found with '
- 'name %r' % name)
- else:
- try:
- handler = logging._handlers[name]
- handler_config = handlers[name]
- level = handler_config.get('level', None)
- if level:
- handler.setLevel(_checkLevel(level))
- except Exception as e:
- raise ValueError('Unable to configure handler '
- '%r: %s' % (name, e))
- loggers = config.get('loggers', EMPTY_DICT)
- for name in loggers:
- try:
- self.configure_logger(name, loggers[name], True)
- except Exception as e:
- raise ValueError('Unable to configure logger '
- '%r: %s' % (name, e))
- root = config.get('root', None)
- if root:
- try:
- self.configure_root(root, True)
- except Exception as e:
- raise ValueError('Unable to configure root '
- 'logger: %s' % e)
- else:
- disable_existing = config.pop('disable_existing_loggers', True)
-
- logging._handlers.clear()
- del logging._handlerList[:]
-
- # Do formatters first - they don't refer to anything else
- formatters = config.get('formatters', EMPTY_DICT)
- for name in formatters:
- try:
- formatters[name] = self.configure_formatter(
- formatters[name])
- except Exception as e:
- raise ValueError('Unable to configure '
- 'formatter %r: %s' % (name, e))
- # Next, do filters - they don't refer to anything else, either
- filters = config.get('filters', EMPTY_DICT)
- for name in filters:
- try:
- filters[name] = self.configure_filter(filters[name])
- except Exception as e:
- raise ValueError('Unable to configure '
- 'filter %r: %s' % (name, e))
-
- # Next, do handlers - they refer to formatters and filters
- # As handlers can refer to other handlers, sort the keys
- # to allow a deterministic order of configuration
- handlers = config.get('handlers', EMPTY_DICT)
- for name in sorted(handlers):
- try:
- handler = self.configure_handler(handlers[name])
- handler.name = name
- handlers[name] = handler
- except Exception as e:
- raise ValueError('Unable to configure handler '
- '%r: %s' % (name, e))
- # Next, do loggers - they refer to handlers and filters
-
- #we don't want to lose the existing loggers,
- #since other threads may have pointers to them.
- #existing is set to contain all existing loggers,
- #and as we go through the new configuration we
- #remove any which are configured. At the end,
- #what's left in existing is the set of loggers
- #which were in the previous configuration but
- #which are not in the new configuration.
- root = logging.root
- existing = root.manager.loggerDict.keys()
- #The list needs to be sorted so that we can
- #avoid disabling child loggers of explicitly
- #named loggers. With a sorted list it is easier
- #to find the child loggers.
- existing.sort()
- #We'll keep the list of existing loggers
- #which are children of named loggers here...
- child_loggers = []
- #now set up the new ones...
- loggers = config.get('loggers', EMPTY_DICT)
- for name in loggers:
- if name in existing:
- i = existing.index(name)
- prefixed = name + "."
- pflen = len(prefixed)
- num_existing = len(existing)
- i = i + 1 # look at the entry after name
- while (i < num_existing) and\
- (existing[i][:pflen] == prefixed):
- child_loggers.append(existing[i])
- i = i + 1
- existing.remove(name)
- try:
- self.configure_logger(name, loggers[name])
- except Exception as e:
- raise ValueError('Unable to configure logger '
- '%r: %s' % (name, e))
-
- #Disable any old loggers. There's no point deleting
- #them as other threads may continue to hold references
- #and by disabling them, you stop them doing any logging.
- #However, don't disable children of named loggers, as that's
- #probably not what was intended by the user.
- for log in existing:
- logger = root.manager.loggerDict[log]
- if log in child_loggers:
- logger.level = logging.NOTSET
- logger.handlers = []
- logger.propagate = True
- elif disable_existing:
- logger.disabled = True
-
- # And finally, do the root logger
- root = config.get('root', None)
- if root:
- try:
- self.configure_root(root)
- except Exception as e:
- raise ValueError('Unable to configure root '
- 'logger: %s' % e)
- finally:
- logging._releaseLock()
-
- def configure_formatter(self, config):
- """Configure a formatter from a dictionary."""
- if '()' in config:
- factory = config['()'] # for use in exception handler
- try:
- result = self.configure_custom(config)
- except TypeError as te:
- if "'format'" not in str(te):
- raise
- #Name of parameter changed from fmt to format.
- #Retry with old name.
- #This is so that code can be used with older Python versions
- #(e.g. by Django)
- config['fmt'] = config.pop('format')
- config['()'] = factory
- result = self.configure_custom(config)
- else:
- fmt = config.get('format', None)
- dfmt = config.get('datefmt', None)
- result = logging.Formatter(fmt, dfmt)
- return result
-
- def configure_filter(self, config):
- """Configure a filter from a dictionary."""
- if '()' in config:
- result = self.configure_custom(config)
- else:
- name = config.get('name', '')
- result = logging.Filter(name)
- return result
-
- def add_filters(self, filterer, filters):
- """Add filters to a filterer from a list of names."""
- for f in filters:
- try:
- filterer.addFilter(self.config['filters'][f])
- except Exception as e:
- raise ValueError('Unable to add filter %r: %s' % (f, e))
-
- def configure_handler(self, config):
- """Configure a handler from a dictionary."""
- formatter = config.pop('formatter', None)
- if formatter:
- try:
- formatter = self.config['formatters'][formatter]
- except Exception as e:
- raise ValueError('Unable to set formatter '
- '%r: %s' % (formatter, e))
- level = config.pop('level', None)
- filters = config.pop('filters', None)
- if '()' in config:
- c = config.pop('()')
- if not hasattr(c, '__call__') and type(c) != type:
- c = self.resolve(c)
- factory = c
- else:
- klass = self.resolve(config.pop('class'))
- #Special case for handler which refers to another handler
- if issubclass(klass, logging.handlers.MemoryHandler) and\
- 'target' in config:
- try:
- config['target'] = self.config['handlers'][config['target']]
- except Exception as e:
- raise ValueError('Unable to set target handler '
- '%r: %s' % (config['target'], e))
- elif issubclass(klass, logging.handlers.SMTPHandler) and\
- 'mailhost' in config:
- config['mailhost'] = self.as_tuple(config['mailhost'])
- elif issubclass(klass, logging.handlers.SysLogHandler) and\
- 'address' in config:
- config['address'] = self.as_tuple(config['address'])
- factory = klass
- kwargs = dict([(str(k), config[k]) for k in config if valid_ident(k)])
- try:
- result = factory(**kwargs)
- except TypeError as te:
- if "'stream'" not in str(te):
- raise
- #The argument name changed from strm to stream
- #Retry with old name.
- #This is so that code can be used with older Python versions
- #(e.g. by Django)
- kwargs['strm'] = kwargs.pop('stream')
- result = factory(**kwargs)
- if formatter:
- result.setFormatter(formatter)
- if level is not None:
- result.setLevel(_checkLevel(level))
- if filters:
- self.add_filters(result, filters)
- return result
-
- def add_handlers(self, logger, handlers):
- """Add handlers to a logger from a list of names."""
- for h in handlers:
- try:
- logger.addHandler(self.config['handlers'][h])
- except Exception as e:
- raise ValueError('Unable to add handler %r: %s' % (h, e))
-
- def common_logger_config(self, logger, config, incremental=False):
- """
- Perform configuration which is common to root and non-root loggers.
- """
- level = config.get('level', None)
- if level is not None:
- logger.setLevel(_checkLevel(level))
- if not incremental:
- #Remove any existing handlers
- for h in logger.handlers[:]:
- logger.removeHandler(h)
- handlers = config.get('handlers', None)
- if handlers:
- self.add_handlers(logger, handlers)
- filters = config.get('filters', None)
- if filters:
- self.add_filters(logger, filters)
-
- def configure_logger(self, name, config, incremental=False):
- """Configure a non-root logger from a dictionary."""
- logger = logging.getLogger(name)
- self.common_logger_config(logger, config, incremental)
- propagate = config.get('propagate', None)
- if propagate is not None:
- logger.propagate = propagate
-
- def configure_root(self, config, incremental=False):
- """Configure a root logger from a dictionary."""
- root = logging.getLogger()
- self.common_logger_config(root, config, incremental)
-
-dictConfigClass = DictConfigurator
-
-def dictConfig(config):
- """Configure logging using a dictionary."""
- dictConfigClass(config).configure()
diff --git a/rq/decorators.py b/rq/decorators.py
index 5398a7c..3c8dc83 100644
--- a/rq/decorators.py
+++ b/rq/decorators.py
@@ -5,8 +5,6 @@ if t.TYPE_CHECKING:
from redis import Redis
from .job import Retry
-from rq.compat import string_types
-
from .defaults import DEFAULT_RESULT_TTL
from .queue import Queue
from .utils import backend_class
@@ -53,7 +51,7 @@ class job: # noqa
def __call__(self, f):
@wraps(f)
def delay(*args, **kwargs):
- if isinstance(self.queue, string_types):
+ if isinstance(self.queue, str):
queue = self.queue_class(name=self.queue,
connection=self.connection)
else:
diff --git a/rq/job.py b/rq/job.py
index 84e9345..c41c17d 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -20,13 +20,13 @@ if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
-from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from .local import LocalStack
from .serializers import resolve_serializer
from .utils import (get_version, import_attribute, parse_timeout, str_to_date,
- utcformat, utcnow, ensure_list, get_call_string)
+ utcformat, utcnow, ensure_list, get_call_string, as_text,
+ decode_redis_hash)
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
@@ -127,7 +127,7 @@ class Job:
job._func_name = func.__name__
elif inspect.isfunction(func) or inspect.isbuiltin(func):
job._func_name = '{0}.{1}'.format(func.__module__, func.__qualname__)
- elif isinstance(func, string_types):
+ elif isinstance(func, str):
job._func_name = as_text(func)
elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance
job._instance = func
@@ -469,7 +469,7 @@ class Job:
def set_id(self, value: str):
"""Sets a job ID for the given job."""
- if not isinstance(value, string_types):
+ if not isinstance(value, str):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
diff --git a/rq/queue.py b/rq/queue.py
index ef650d8..3cdfcb6 100644
--- a/rq/queue.py
+++ b/rq/queue.py
@@ -5,6 +5,7 @@ import typing as t
import logging
from collections import namedtuple
from datetime import datetime, timezone
+from functools import total_ordering
from redis import WatchError
@@ -12,7 +13,7 @@ if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
-from .compat import as_text, string_types, total_ordering
+from .utils import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
@@ -94,7 +95,7 @@ class Queue:
# override class attribute job_class if one was passed
if job_class is not None:
- if isinstance(job_class, string_types):
+ if isinstance(job_class, str):
job_class = import_attribute(job_class)
self.job_class = job_class
@@ -487,7 +488,7 @@ class Queue:
* A string, representing the location of a function (must be
meaningful to the import context of the workers)
"""
- if not isinstance(f, string_types) and f.__module__ == '__main__':
+ if not isinstance(f, str) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed '
'by workers')
diff --git a/rq/registry.py b/rq/registry.py
index 2d484ec..d988981 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -8,7 +8,7 @@ if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
-from .compat import as_text
+from .utils import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
from .exceptions import InvalidJobOperation, NoSuchJobError
diff --git a/rq/results.py b/rq/results.py
index 931907c..cf85a5b 100644
--- a/rq/results.py
+++ b/rq/results.py
@@ -1,16 +1,12 @@
-import json
from typing import Any, Optional
import zlib
from base64 import b64decode, b64encode
from datetime import datetime, timezone
from enum import Enum
-from uuid import uuid4
-
from redis import Redis
-from redis.client import Pipeline
-from .compat import decode_redis_hash
+from .utils import decode_redis_hash
from .job import Job
from .serializers import resolve_serializer
from .utils import now
diff --git a/rq/serializers.py b/rq/serializers.py
index babab69..00fd0a7 100644
--- a/rq/serializers.py
+++ b/rq/serializers.py
@@ -2,7 +2,6 @@ from functools import partial
import pickle
import json
-from .compat import string_types
from .utils import import_attribute
@@ -30,7 +29,7 @@ def resolve_serializer(serializer: str):
if not serializer:
return DefaultSerializer
- if isinstance(serializer, string_types):
+ if isinstance(serializer, str):
serializer = import_attribute(serializer)
default_serializer_methods = ('dumps', 'loads')
diff --git a/rq/utils.py b/rq/utils.py
index 5752976..e347ec3 100644
--- a/rq/utils.py
+++ b/rq/utils.py
@@ -20,7 +20,6 @@ if t.TYPE_CHECKING:
from redis.exceptions import ResponseError
-from .compat import as_text, string_types
from .exceptions import TimeoutFormatError
logger = logging.getLogger(__name__)
@@ -126,6 +125,21 @@ class ColorizingStreamHandler(logging.StreamHandler):
return message
+def as_text(v):
+ if v is None:
+ return None
+ elif isinstance(v, bytes):
+ return v.decode('utf-8')
+ elif isinstance(v, str):
+ return v
+ else:
+ raise ValueError('Unknown type %r' % type(v))
+
+
+def decode_redis_hash(h):
+ return dict((as_text(k), h[k]) for k in h)
+
+
def import_attribute(name: str):
"""Returns an attribute from a dotted path name. Example: `path.to.func`.
@@ -243,7 +257,7 @@ def first(iterable: t.Iterable, default=None, key=None):
def is_nonstring_iterable(obj: t.Any) -> bool:
"""Returns whether the obj is an iterable, but not a string"""
- return isinstance(obj, Iterable) and not isinstance(obj, string_types)
+ return isinstance(obj, Iterable) and not isinstance(obj, str)
def ensure_list(obj: t.Any) -> t.List:
@@ -263,7 +277,7 @@ def backend_class(holder, default_name, override=None):
"""Get a backend class using its default attribute name or an override"""
if override is None:
return getattr(holder, default_name)
- elif isinstance(override, string_types):
+ elif isinstance(override, str):
return import_attribute(override)
else:
return override
diff --git a/rq/worker.py b/rq/worker.py
index 28ff6b9..c5ecdf0 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -29,7 +29,7 @@ import redis.exceptions
from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
-from .compat import as_text, string_types, text_type
+from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL,
@@ -193,7 +193,7 @@ class Worker:
queues = [self.queue_class(name=q,
connection=connection,
job_class=self.job_class, serializer=self.serializer)
- if isinstance(q, string_types) else q
+ if isinstance(q, str) else q
for q in ensure_list(queues)]
self.name: str = name or uuid4().hex
@@ -1157,7 +1157,7 @@ class Worker:
self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
if rv is not None:
- log_result = "{0!r}".format(as_text(text_type(rv)))
+ log_result = "{0!r}".format(as_text(str(rv)))
self.log.debug('Result: %s', yellow(log_result))
if self.log_result_lifespan:
diff --git a/rq/worker_registration.py b/rq/worker_registration.py
index 787c7a1..c1e1181 100644
--- a/rq/worker_registration.py
+++ b/rq/worker_registration.py
@@ -6,7 +6,7 @@ if t.TYPE_CHECKING:
from .worker import Worker
from .queue import Queue
-from .compat import as_text
+from .utils import as_text
from rq.utils import split_list
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 7307091..e75fb68 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -14,7 +14,6 @@ from multiprocessing import Process
from redis import Redis
from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job
-from rq.compat import text_type
from rq.worker import HerokuWorker, Worker
@@ -36,7 +35,7 @@ async def say_hello_async(name=None):
def say_hello_unicode(name=None):
"""A job with a single argument and a return value."""
- return text_type(say_hello(name)) # noqa
+ return str(say_hello(name)) # noqa
def do_nothing():
diff --git a/tests/test_job.py b/tests/test_job.py
index 237c5ef..46fbe73 100644
--- a/tests/test_job.py
+++ b/tests/test_job.py
@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from redis import WatchError
-from rq.compat import as_text
+from rq.utils import as_text
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job
from rq.queue import Queue
diff --git a/tests/test_registry.py b/tests/test_registry.py
index d7b607f..28a29ca 100644
--- a/tests/test_registry.py
+++ b/tests/test_registry.py
@@ -1,7 +1,7 @@
from datetime import datetime, timedelta
from rq.serializers import JSONSerializer
-from rq.compat import as_text
+from rq.utils import as_text
from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import InvalidJobOperation
from rq.job import Job, JobStatus, requeue_job
diff --git a/tests/test_worker.py b/tests/test_worker.py
index 6c7b405..1977c93 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -28,7 +28,7 @@ from tests.fixtures import (
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
-from rq.compat import as_text
+from rq.utils import as_text
from rq.job import Job, JobStatus, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.results import Result