summaryrefslogtreecommitdiff
path: root/taskflow/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/jobs')
-rw-r--r--taskflow/jobs/backends/impl_redis.py12
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py5
-rw-r--r--taskflow/jobs/base.py16
3 files changed, 14 insertions, 19 deletions
diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py
index 8c1e511..30fa0f3 100644
--- a/taskflow/jobs/backends/impl_redis.py
+++ b/taskflow/jobs/backends/impl_redis.py
@@ -30,8 +30,6 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils
from redis import exceptions as redis_exceptions
from redis import sentinel
-import six
-from six.moves import range as compat_range
from taskflow import exceptions as exc
from taskflow.jobs import base
@@ -620,9 +618,9 @@ return cmsgpack.pack(result)
key_pieces = [key_piece]
if more_key_pieces:
key_pieces.extend(more_key_pieces)
- for i in compat_range(0, len(namespace_pieces)):
+ for i in range(0, len(namespace_pieces)):
namespace_pieces[i] = misc.binary_encode(namespace_pieces[i])
- for i in compat_range(0, len(key_pieces)):
+ for i in range(0, len(key_pieces)):
key_pieces[i] = misc.binary_encode(key_pieces[i])
namespace = b"".join(namespace_pieces)
key = self.KEY_PIECE_SEP.join(key_pieces)
@@ -696,7 +694,7 @@ return cmsgpack.pack(result)
'already_claimed': self.SCRIPT_ALREADY_CLAIMED,
}
prepared_scripts = {}
- for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES):
+ for n, raw_script_tpl in self.SCRIPT_TEMPLATES.items():
script_tpl = string.Template(raw_script_tpl)
script_blob = script_tpl.substitute(**script_params)
script = self._client.register_script(script_blob)
@@ -761,7 +759,7 @@ return cmsgpack.pack(result)
})
with _translate_failures():
raw_posting = self._dumps(posting)
- raw_job_uuid = six.b(job_uuid)
+ raw_job_uuid = job_uuid.encode('latin-1')
was_posted = bool(self._client.hsetnx(self.listings_key,
raw_job_uuid, raw_posting))
if not was_posted:
@@ -813,7 +811,7 @@ return cmsgpack.pack(result)
with _translate_failures():
raw_postings = self._client.hgetall(self.listings_key)
postings = []
- for raw_job_key, raw_posting in six.iteritems(raw_postings):
+ for raw_job_key, raw_posting in raw_postings.items():
try:
job_data = self._loads(raw_posting)
try:
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 6ee2222..fc9399a 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -30,7 +30,6 @@ from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from taskflow.conductors import base as c_base
from taskflow import exceptions as excp
@@ -373,7 +372,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if ensure_fresh:
self._force_refresh()
with self._job_cond:
- return sorted(six.itervalues(self._known_jobs))
+ return sorted(self._known_jobs.values())
def _force_refresh(self):
try:
@@ -479,7 +478,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths = []
pending_removals = []
with self._job_cond:
- for path in six.iterkeys(self._known_jobs):
+ for path in self._known_jobs.keys():
if path not in child_paths:
pending_removals.append(path)
for path in child_paths:
diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py
index 4e9bf1a..3bf5198 100644
--- a/taskflow/jobs/base.py
+++ b/taskflow/jobs/base.py
@@ -18,12 +18,12 @@
import abc
import collections
import contextlib
+import functools
import time
import enum
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from taskflow import exceptions as excp
from taskflow import states
@@ -105,8 +105,7 @@ class JobPriority(enum.Enum):
return tuple(values)
-@six.add_metaclass(abc.ABCMeta)
-class Job(object):
+class Job(object, metaclass=abc.ABCMeta):
"""A abstraction that represents a named and trackable unit of work.
A job connects a logbook, a owner, a priority, last modified and created
@@ -195,7 +194,7 @@ class Job(object):
return False
if self.state == states.COMPLETE:
return True
- sleepy_secs = six.next(delay_gen)
+ sleepy_secs = next(delay_gen)
if w is not None:
sleepy_secs = min(w.leftover(), sleepy_secs)
sleep_func(sleepy_secs)
@@ -269,7 +268,7 @@ class Job(object):
self.uuid, self.details)
-class JobBoardIterator(six.Iterator):
+class JobBoardIterator(object):
"""Iterator over a jobboard that iterates over potential jobs.
It provides the following attributes:
@@ -342,8 +341,7 @@ class JobBoardIterator(six.Iterator):
return job
-@six.add_metaclass(abc.ABCMeta)
-class JobBoard(object):
+class JobBoard(object, metaclass=abc.ABCMeta):
"""A place where jobs can be posted, reposted, claimed and transferred.
There can be multiple implementations of this job board, depending on the
@@ -559,9 +557,9 @@ class NotifyingJobBoard(JobBoard):
def check_who(meth):
- @six.wraps(meth)
+ @functools.wraps(meth)
def wrapper(self, job, who, *args, **kwargs):
- if not isinstance(who, six.string_types):
+ if not isinstance(who, str):
raise TypeError("Job applicant must be a string type")
if len(who) == 0:
raise ValueError("Job applicant must be non-empty")