diff options
Diffstat (limited to 'taskflow/jobs')
-rw-r--r-- | taskflow/jobs/backends/impl_redis.py | 12 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 5 | ||||
-rw-r--r-- | taskflow/jobs/base.py | 16 |
3 files changed, 14 insertions, 19 deletions
diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 8c1e511..30fa0f3 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -30,8 +30,6 @@ from oslo_utils import timeutils from oslo_utils import uuidutils from redis import exceptions as redis_exceptions from redis import sentinel -import six -from six.moves import range as compat_range from taskflow import exceptions as exc from taskflow.jobs import base @@ -620,9 +618,9 @@ return cmsgpack.pack(result) key_pieces = [key_piece] if more_key_pieces: key_pieces.extend(more_key_pieces) - for i in compat_range(0, len(namespace_pieces)): + for i in range(0, len(namespace_pieces)): namespace_pieces[i] = misc.binary_encode(namespace_pieces[i]) - for i in compat_range(0, len(key_pieces)): + for i in range(0, len(key_pieces)): key_pieces[i] = misc.binary_encode(key_pieces[i]) namespace = b"".join(namespace_pieces) key = self.KEY_PIECE_SEP.join(key_pieces) @@ -696,7 +694,7 @@ return cmsgpack.pack(result) 'already_claimed': self.SCRIPT_ALREADY_CLAIMED, } prepared_scripts = {} - for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES): + for n, raw_script_tpl in self.SCRIPT_TEMPLATES.items(): script_tpl = string.Template(raw_script_tpl) script_blob = script_tpl.substitute(**script_params) script = self._client.register_script(script_blob) @@ -761,7 +759,7 @@ return cmsgpack.pack(result) }) with _translate_failures(): raw_posting = self._dumps(posting) - raw_job_uuid = six.b(job_uuid) + raw_job_uuid = job_uuid.encode('latin-1') was_posted = bool(self._client.hsetnx(self.listings_key, raw_job_uuid, raw_posting)) if not was_posted: @@ -813,7 +811,7 @@ return cmsgpack.pack(result) with _translate_failures(): raw_postings = self._client.hgetall(self.listings_key) postings = [] - for raw_job_key, raw_posting in six.iteritems(raw_postings): + for raw_job_key, raw_posting in raw_postings.items(): try: job_data = self._loads(raw_posting) try: diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6ee2222..fc9399a 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -30,7 +30,6 @@ from oslo_serialization import jsonutils from oslo_utils import excutils from oslo_utils import timeutils from oslo_utils import uuidutils -import six from taskflow.conductors import base as c_base from taskflow import exceptions as excp @@ -373,7 +372,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if ensure_fresh: self._force_refresh() with self._job_cond: - return sorted(six.itervalues(self._known_jobs)) + return sorted(self._known_jobs.values()) def _force_refresh(self): try: @@ -479,7 +478,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): investigate_paths = [] pending_removals = [] with self._job_cond: - for path in six.iterkeys(self._known_jobs): + for path in self._known_jobs.keys(): if path not in child_paths: pending_removals.append(path) for path in child_paths: diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index 4e9bf1a..3bf5198 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -18,12 +18,12 @@ import abc import collections import contextlib +import functools import time import enum from oslo_utils import timeutils from oslo_utils import uuidutils -import six from taskflow import exceptions as excp from taskflow import states @@ -105,8 +105,7 @@ class JobPriority(enum.Enum): return tuple(values) -@six.add_metaclass(abc.ABCMeta) -class Job(object): +class Job(object, metaclass=abc.ABCMeta): """A abstraction that represents a named and trackable unit of work. A job connects a logbook, a owner, a priority, last modified and created @@ -195,7 +194,7 @@ class Job(object): return False if self.state == states.COMPLETE: return True - sleepy_secs = six.next(delay_gen) + sleepy_secs = next(delay_gen) if w is not None: sleepy_secs = min(w.leftover(), sleepy_secs) sleep_func(sleepy_secs) @@ -269,7 +268,7 @@ class Job(object): self.uuid, self.details) -class JobBoardIterator(six.Iterator): +class JobBoardIterator(object): """Iterator over a jobboard that iterates over potential jobs. It provides the following attributes: @@ -342,8 +341,7 @@ class JobBoardIterator(six.Iterator): return job -@six.add_metaclass(abc.ABCMeta) -class JobBoard(object): +class JobBoard(object, metaclass=abc.ABCMeta): """A place where jobs can be posted, reposted, claimed and transferred. There can be multiple implementations of this job board, depending on the @@ -559,9 +557,9 @@ class NotifyingJobBoard(JobBoard): def check_who(meth): - @six.wraps(meth) + @functools.wraps(meth) def wrapper(self, job, who, *args, **kwargs): - if not isinstance(who, six.string_types): + if not isinstance(who, str): raise TypeError("Job applicant must be a string type") if len(who) == 0: raise ValueError("Job applicant must be non-empty") |