summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakashi Kajinami <tkajinam@redhat.com>2022-05-17 22:56:45 +0900
committerTakashi Kajinami <tkajinam@redhat.com>2022-05-18 16:12:37 +0900
commit44f17d005ff53008144ca7c509bcb1307d66b23f (patch)
treeb03024443b92a78f3cdacfca29f4010d24c8b685
parentb5b69e8110da44a88b2260cd24ada3439f29938e (diff)
downloadtaskflow-44f17d005ff53008144ca7c509bcb1307d66b23f.tar.gz
Remove six
This library no longer supports Python 2, thus usage of six can be removed. This also removes workaround about pickle library used in Python 2 only. Change-Id: I19d298cf0f402d65f0b142dea0bf35cf992332a9
-rw-r--r--doc/source/user/utils.rst5
-rw-r--r--requirements.txt3
-rw-r--r--taskflow/atom.py25
-rw-r--r--taskflow/conductors/backends/impl_executor.py4
-rw-r--r--taskflow/conductors/backends/impl_nonblocking.py3
-rw-r--r--taskflow/conductors/base.py4
-rw-r--r--taskflow/deciders.py4
-rw-r--r--taskflow/engines/action_engine/actions/base.py5
-rw-r--r--taskflow/engines/action_engine/compiler.py3
-rw-r--r--taskflow/engines/action_engine/completer.py4
-rw-r--r--taskflow/engines/action_engine/deciders.py9
-rw-r--r--taskflow/engines/action_engine/engine.py10
-rw-r--r--taskflow/engines/action_engine/executor.py4
-rw-r--r--taskflow/engines/action_engine/process_executor.py36
-rw-r--r--taskflow/engines/base.py5
-rw-r--r--taskflow/engines/helpers.py3
-rw-r--r--taskflow/engines/worker_based/executor.py3
-rw-r--r--taskflow/engines/worker_based/protocol.py12
-rw-r--r--taskflow/engines/worker_based/proxy.py5
-rw-r--r--taskflow/engines/worker_based/types.py13
-rw-r--r--taskflow/examples/example_utils.py2
-rw-r--r--taskflow/examples/jobboard_produce_consume_colors.py24
-rw-r--r--taskflow/examples/parallel_table_multiply.py7
-rw-r--r--taskflow/examples/run_by_iter.py4
-rw-r--r--taskflow/examples/share_engine_thread.py3
-rw-r--r--taskflow/examples/simple_map_reduce.py4
-rw-r--r--taskflow/examples/tox_conductor.py5
-rw-r--r--taskflow/examples/wbe_event_sender.py4
-rw-r--r--taskflow/examples/wbe_mandelbrot.py12
-rw-r--r--taskflow/exceptions.py17
-rw-r--r--taskflow/flow.py6
-rw-r--r--taskflow/jobs/backends/impl_redis.py12
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py5
-rw-r--r--taskflow/jobs/base.py16
-rw-r--r--taskflow/listeners/base.py4
-rw-r--r--taskflow/listeners/claims.py4
-rw-r--r--taskflow/listeners/timing.py3
-rw-r--r--taskflow/patterns/graph_flow.py7
-rw-r--r--taskflow/persistence/backends/impl_memory.py3
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py13
-rw-r--r--taskflow/persistence/base.py8
-rw-r--r--taskflow/persistence/models.py20
-rw-r--r--taskflow/persistence/path_based.py7
-rw-r--r--taskflow/retry.py6
-rw-r--r--taskflow/storage.py23
-rw-r--r--taskflow/task.py19
-rw-r--r--taskflow/test.py3
-rw-r--r--taskflow/tests/test_examples.py5
-rw-r--r--taskflow/tests/unit/action_engine/test_builder.py15
-rw-r--r--taskflow/tests/unit/jobs/test_redis_job.py3
-rw-r--r--taskflow/tests/unit/jobs/test_zk_job.py11
-rw-r--r--taskflow/tests/unit/persistence/test_sql_persistence.py5
-rw-r--r--taskflow/tests/unit/test_engines.py5
-rw-r--r--taskflow/tests/unit/test_exceptions.py4
-rw-r--r--taskflow/tests/unit/test_failure.py24
-rw-r--r--taskflow/tests/unit/test_listeners.py5
-rw-r--r--taskflow/tests/unit/test_types.py2
-rw-r--r--taskflow/tests/unit/test_utils.py3
-rw-r--r--taskflow/tests/unit/test_utils_binary.py17
-rw-r--r--taskflow/tests/unit/test_utils_iter_utils.py39
-rw-r--r--taskflow/tests/unit/worker_based/test_server.py4
-rw-r--r--taskflow/tests/unit/worker_based/test_worker.py4
-rw-r--r--taskflow/tests/utils.py15
-rw-r--r--taskflow/types/failure.py18
-rw-r--r--taskflow/types/graph.py6
-rw-r--r--taskflow/types/notifier.py11
-rw-r--r--taskflow/types/sets.py4
-rw-r--r--taskflow/types/timing.py4
-rw-r--r--taskflow/types/tree.py7
-rw-r--r--taskflow/utils/banner.py6
-rw-r--r--taskflow/utils/iter_utils.py8
-rw-r--r--taskflow/utils/kazoo_utils.py10
-rw-r--r--taskflow/utils/misc.py27
-rw-r--r--taskflow/utils/mixins.py35
-rw-r--r--taskflow/utils/redis_utils.py4
-rw-r--r--taskflow/utils/threading_utils.py6
-rwxr-xr-xtools/schema_generator.py7
-rw-r--r--tools/speed_test.py5
-rw-r--r--tox.ini1
79 files changed, 266 insertions, 455 deletions
diff --git a/doc/source/user/utils.rst b/doc/source/user/utils.rst
index 3c8c9b1..0a482f9 100644
--- a/doc/source/user/utils.rst
+++ b/doc/source/user/utils.rst
@@ -43,11 +43,6 @@ Miscellaneous
.. automodule:: taskflow.utils.misc
-Mixins
-~~~~~~
-
-.. automodule:: taskflow.utils.mixins
-
Persistence
~~~~~~~~~~~
diff --git a/requirements.txt b/requirements.txt
index f1cdc80..32ded5c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,9 +7,6 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
# Packages needed for using this library.
-# Python 2->3 compatibility library.
-six>=1.10.0 # MIT
-
# For async and/or periodic work
futurist>=1.2.0 # Apache-2.0
diff --git a/taskflow/atom.py b/taskflow/atom.py
index 6e41204..d775722 100644
--- a/taskflow/atom.py
+++ b/taskflow/atom.py
@@ -21,8 +21,6 @@ from collections import abc as cabc
import itertools
from oslo_utils import reflection
-import six
-from six.moves import zip as compat_zip
from taskflow.types import sets
from taskflow.utils import misc
@@ -47,7 +45,7 @@ def _save_as_to_mapping(save_as):
# atom returns is pretty crucial for other later operations.
if save_as is None:
return collections.OrderedDict()
- if isinstance(save_as, six.string_types):
+ if isinstance(save_as, str):
# NOTE(harlowja): this means that your atom will only return one item
# instead of a dictionary-like object or a indexable object (like a
# list or tuple).
@@ -83,7 +81,7 @@ def _build_rebind_dict(req_args, rebind_args):
# the required argument names (if they are the same length then
# this determines how to remap the required argument names to the
# rebound ones).
- rebind = collections.OrderedDict(compat_zip(req_args, rebind_args))
+ rebind = collections.OrderedDict(zip(req_args, rebind_args))
if len(req_args) < len(rebind_args):
# Extra things were rebound, that may be because of *args
# or **kwargs (or some other reason); so just keep all of them
@@ -128,7 +126,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Add additional manually provided requirements to required mappings.
if reqs:
- if isinstance(reqs, six.string_types):
+ if isinstance(reqs, str):
required.update({reqs: reqs})
else:
required.update((a, a) for a in reqs)
@@ -139,7 +137,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Determine if there are optional arguments that we may or may not take.
if do_infer:
opt_args = sets.OrderedSet(all_args)
- opt_args = opt_args - set(itertools.chain(six.iterkeys(required),
+ opt_args = opt_args - set(itertools.chain(required.keys(),
iter(ignore_list)))
optional = collections.OrderedDict((a, a) for a in opt_args)
else:
@@ -147,7 +145,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
# Check if we are given some extra arguments that we aren't able to accept.
if not reflection.accepts_kwargs(function):
- extra_args = sets.OrderedSet(six.iterkeys(required))
+ extra_args = sets.OrderedSet(required.keys())
extra_args -= all_args
if extra_args:
raise ValueError('Extra arguments given to atom %s: %s'
@@ -161,8 +159,7 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer,
return required, optional
-@six.add_metaclass(abc.ABCMeta)
-class Atom(object):
+class Atom(object, metaclass=abc.ABCMeta):
"""An unit of work that causes a flow to progress (in some manner).
An atom is a named object that operates with input data to perform
@@ -299,13 +296,13 @@ class Atom(object):
# key value, then well there is no rebinding happening, otherwise
# there will be.
rebind = collections.OrderedDict()
- for (arg_name, bound_name) in itertools.chain(six.iteritems(required),
- six.iteritems(optional)):
+ for (arg_name, bound_name) in itertools.chain(required.items(),
+ optional.items()):
rebind.setdefault(arg_name, bound_name)
- requires = sets.OrderedSet(six.itervalues(required))
- optional = sets.OrderedSet(six.itervalues(optional))
+ requires = sets.OrderedSet(required.values())
+ optional = sets.OrderedSet(optional.values())
if self.inject:
- inject_keys = frozenset(six.iterkeys(self.inject))
+ inject_keys = frozenset(self.inject.keys())
requires -= inject_keys
optional -= inject_keys
return rebind, requires, optional
diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py
index ab55821..298bc87 100644
--- a/taskflow/conductors/backends/impl_executor.py
+++ b/taskflow/conductors/backends/impl_executor.py
@@ -20,7 +20,6 @@ import threading
from oslo_utils import excutils
from oslo_utils import timeutils
-import six
from taskflow.conductors import base
from taskflow import exceptions as excp
@@ -34,8 +33,7 @@ from taskflow.utils import misc
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
-class ExecutorConductor(base.Conductor):
+class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
"""Dispatches jobs from blocking :py:meth:`.run` method to some executor.
This conductor iterates over jobs in the provided jobboard (waiting for
diff --git a/taskflow/conductors/backends/impl_nonblocking.py b/taskflow/conductors/backends/impl_nonblocking.py
index 76893d7..dcdd238 100644
--- a/taskflow/conductors/backends/impl_nonblocking.py
+++ b/taskflow/conductors/backends/impl_nonblocking.py
@@ -13,7 +13,6 @@
# under the License.
import futurist
-import six
from taskflow.conductors.backends import impl_executor
from taskflow.utils import threading_utils as tu
@@ -63,7 +62,7 @@ class NonBlockingConductor(impl_executor.ExecutorConductor):
if executor_factory is None:
self._executor_factory = self._default_executor_factory
else:
- if not six.callable(executor_factory):
+ if not callable(executor_factory):
raise ValueError("Provided keyword argument 'executor_factory'"
" must be callable")
self._executor_factory = executor_factory
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 1f0da02..2f9296d 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -17,7 +17,6 @@ import os
import threading
import fasteners
-import six
from taskflow import engines
from taskflow import exceptions as excp
@@ -26,8 +25,7 @@ from taskflow.types import notifier
from taskflow.utils import misc
-@six.add_metaclass(abc.ABCMeta)
-class Conductor(object):
+class Conductor(object, metaclass=abc.ABCMeta):
"""Base for all conductor implementations.
Conductors act as entities which extract jobs from a jobboard, assign
diff --git a/taskflow/deciders.py b/taskflow/deciders.py
index c96b321..9e31663 100644
--- a/taskflow/deciders.py
+++ b/taskflow/deciders.py
@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import six
-
from taskflow.utils import misc
@@ -74,7 +72,7 @@ class Depth(misc.StrEnum):
if isinstance(desired_depth, cls):
# Nothing to do in the first place...
return desired_depth
- if not isinstance(desired_depth, six.string_types):
+ if not isinstance(desired_depth, str):
raise TypeError("Unexpected desired depth type, string type"
" expected, not %s" % type(desired_depth))
try:
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py
index 3a014e1..e4be825 100644
--- a/taskflow/engines/action_engine/actions/base.py
+++ b/taskflow/engines/action_engine/actions/base.py
@@ -16,13 +16,10 @@
import abc
-import six
-
from taskflow import states
-@six.add_metaclass(abc.ABCMeta)
-class Action(object):
+class Action(object, metaclass=abc.ABCMeta):
"""An action that handles executing, state changes, ... of atoms."""
NO_RESULT = object()
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py
index c72506a..f1e0e54 100644
--- a/taskflow/engines/action_engine/compiler.py
+++ b/taskflow/engines/action_engine/compiler.py
@@ -18,7 +18,6 @@ import threading
import fasteners
from oslo_utils import excutils
-import six
from taskflow import flow
from taskflow import logging
@@ -165,7 +164,7 @@ class FlowCompiler(object):
decomposed = dict(
(child, self._deep_compiler_func(child, parent=tree_node)[0])
for child in flow)
- decomposed_graphs = list(six.itervalues(decomposed))
+ decomposed_graphs = list(decomposed.values())
graph = gr.merge_graphs(graph, *decomposed_graphs,
overlap_detector=_overlap_occurrence_detector)
for u, v, attr_dict in flow.iter_links():
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 59a2dbf..028b64a 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -19,7 +19,6 @@ import weakref
from oslo_utils import reflection
from oslo_utils import strutils
-import six
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
@@ -30,8 +29,7 @@ from taskflow import states as st
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
-class Strategy(object):
+class Strategy(object, metaclass=abc.ABCMeta):
"""Failure resolution strategy base class."""
strategy = None
diff --git a/taskflow/engines/action_engine/deciders.py b/taskflow/engines/action_engine/deciders.py
index 4c0c04b..b8c399a 100644
--- a/taskflow/engines/action_engine/deciders.py
+++ b/taskflow/engines/action_engine/deciders.py
@@ -17,8 +17,6 @@
import abc
import itertools
-import six
-
from taskflow import deciders
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import traversal
@@ -28,8 +26,7 @@ from taskflow import states
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
-class Decider(object):
+class Decider(object, metaclass=abc.ABCMeta):
"""Base class for deciders.
Provides interface to be implemented by sub-classes.
@@ -135,7 +132,7 @@ class IgnoreDecider(Decider):
states_intentions = runtime.storage.get_atoms_states(
ed.from_node.name for ed in self._edge_deciders
if ed.kind in compiler.ATOMS)
- for atom_name in six.iterkeys(states_intentions):
+ for atom_name in states_intentions.keys():
atom_state, _atom_intention = states_intentions[atom_name]
if atom_state != states.IGNORE:
history[atom_name] = runtime.storage.get(atom_name)
@@ -155,7 +152,7 @@ class IgnoreDecider(Decider):
LOG.trace("Out of %s deciders there were %s 'do no run it'"
" voters, %s 'do run it' voters and %s 'ignored'"
" voters for transition to atom '%s' given history %s",
- sum(len(eds) for eds in six.itervalues(voters)),
+ sum(len(eds) for eds in voters.values()),
list(ed.from_node.name
for ed in voters['do_not_run_it']),
list(ed.from_node.name for ed in voters['run_it']),
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index cf7c042..4895ac3 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -22,11 +22,11 @@ import threading
from automaton import runners
from concurrent import futures
import fasteners
+import functools
import networkx as nx
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import timeutils
-import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
@@ -65,7 +65,7 @@ def _pre_check(check_compiled=True, check_storage_ensured=True,
def decorator(meth):
do_what = meth.__name__
- @six.wraps(meth)
+ @functools.wraps(meth)
def wrapper(self, *args, **kwargs):
if check_compiled and not self._compiled:
raise exc.InvalidState("Can not %s an engine which"
@@ -335,8 +335,8 @@ class ActionEngine(base.Engine):
e_failures = self.storage.get_execute_failures()
r_failures = self.storage.get_revert_failures()
er_failures = itertools.chain(
- six.itervalues(e_failures),
- six.itervalues(r_failures))
+ e_failures.values(),
+ r_failures.values())
failure.Failure.reraise_if_any(er_failures)
finally:
if w is not None:
@@ -594,7 +594,7 @@ String (case insensitive) Executor used
executor_cls = cls._default_executor_cls
# Match the desired executor to a class that will work with it...
desired_executor = options.get('executor')
- if isinstance(desired_executor, six.string_types):
+ if isinstance(desired_executor, str):
matched_executor_cls = None
for m in cls._executor_str_matchers:
if m.matches(desired_executor):
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index 5dbaf58..d0c2ecf 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -17,7 +17,6 @@
import abc
import futurist
-import six
from taskflow import task as ta
from taskflow.types import failure
@@ -106,8 +105,7 @@ class SerialRetryExecutor(object):
return fut
-@six.add_metaclass(abc.ABCMeta)
-class TaskExecutor(object):
+class TaskExecutor(object, metaclass=abc.ABCMeta):
"""Executes and reverts tasks.
This class takes task and its arguments and executes or reverts it.
diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py
index 4115653..d20f7a3 100644
--- a/taskflow/engines/action_engine/process_executor.py
+++ b/taskflow/engines/action_engine/process_executor.py
@@ -30,7 +30,6 @@ import time
import futurist
from oslo_utils import excutils
-import six
from taskflow.engines.action_engine import executor as base
from taskflow import logging
@@ -80,19 +79,6 @@ SCHEMAS = {
},
}
-# See http://bugs.python.org/issue1457119 for why this is so complex...
-_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError]
-try:
- import cPickle
- _DECODE_ENCODE_ERRORS.append(cPickle.PickleError)
- del cPickle
-except (ImportError, AttributeError):
- pass
-_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS)
-
-# Use the best pickle from here on out...
-from six.moves import cPickle as pickle
-
class UnknownSender(Exception):
"""Exception raised when message from unknown sender is recvd."""
@@ -142,13 +128,13 @@ class Reader(object):
])
def __init__(self, auth_key, dispatch_func, msg_limit=-1):
- if not six.callable(dispatch_func):
+ if not callable(dispatch_func):
raise ValueError("Expected provided dispatch function"
" to be callable")
self.auth_key = auth_key
self.dispatch_func = dispatch_func
msg_limiter = iter_utils.iter_forever(msg_limit)
- self.msg_count = six.next(msg_limiter)
+ self.msg_count = next(msg_limiter)
self._msg_limiter = msg_limiter
self._buffer = misc.BytesIO()
self._state = None
@@ -200,7 +186,7 @@ class Reader(object):
# (instead of the receiver discarding it after the fact)...
functools.partial(_decode_message, self.auth_key, data,
self._memory['mac']))
- self.msg_count = six.next(self._msg_limiter)
+ self.msg_count = next(self._msg_limiter)
self._memory.clear()
def _transition(self):
@@ -267,7 +253,7 @@ def _create_random_string(desired_length):
def _calculate_hmac(auth_key, body):
mac = hmac.new(auth_key, body, hashlib.md5).hexdigest()
- if isinstance(mac, six.text_type):
+ if isinstance(mac, str):
mac = mac.encode("ascii")
return mac
@@ -427,11 +413,8 @@ class DispatcherHandler(asyncore.dispatcher):
CHUNK_SIZE = 8192
def __init__(self, sock, addr, dispatcher):
- if six.PY2:
- asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock)
- else:
- super(DispatcherHandler, self).__init__(map=dispatcher.map,
- sock=sock)
+ super(DispatcherHandler, self).__init__(map=dispatcher.map,
+ sock=sock)
self.blobs_to_write = list(dispatcher.challenge_pieces)
self.reader = Reader(dispatcher.auth_key, self._dispatch)
self.targets = dispatcher.targets
@@ -508,7 +491,7 @@ class DispatcherHandler(asyncore.dispatcher):
except (IOError, UnknownSender):
LOG.warning("Invalid received message", exc_info=True)
self.handle_close()
- except _DECODE_ENCODE_ERRORS:
+ except (pickle.PickleError, TypeError):
LOG.warning("Badly formatted message", exc_info=True)
self.handle_close()
except (ValueError, su.ValidationError):
@@ -526,10 +509,7 @@ class Dispatcher(asyncore.dispatcher):
MAX_BACKLOG = 5
def __init__(self, map, auth_key, identity):
- if six.PY2:
- asyncore.dispatcher.__init__(self, map=map)
- else:
- super(Dispatcher, self).__init__(map=map)
+ super(Dispatcher, self).__init__(map=map)
self.identity = identity
self.challenge_pieces = _encode_message(auth_key, CHALLENGE,
identity, reverse=True)
diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py
index 84d227e..92ecdbd 100644
--- a/taskflow/engines/base.py
+++ b/taskflow/engines/base.py
@@ -17,14 +17,11 @@
import abc
-import six
-
from taskflow.types import notifier
from taskflow.utils import misc
-@six.add_metaclass(abc.ABCMeta)
-class Engine(object):
+class Engine(object, metaclass=abc.ABCMeta):
"""Base for all engines implementations.
:ivar Engine.notifier: A notification object that will dispatch
diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py
index 9bcbc12..e3f2a8b 100644
--- a/taskflow/engines/helpers.py
+++ b/taskflow/engines/helpers.py
@@ -18,7 +18,6 @@ import contextlib
from oslo_utils import importutils
from oslo_utils import reflection
-import six
import stevedore.driver
from taskflow import exceptions as exc
@@ -68,7 +67,7 @@ def _fetch_factory(factory_name):
def _fetch_validate_factory(flow_factory):
- if isinstance(flow_factory, six.string_types):
+ if isinstance(flow_factory, str):
factory_fun = _fetch_factory(flow_factory)
factory_name = flow_factory
else:
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index 0215037..14139bf 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -18,7 +18,6 @@ import functools
import threading
from oslo_utils import timeutils
-import six
from taskflow.engines.action_engine import executor
from taskflow.engines.worker_based import dispatcher
@@ -141,7 +140,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
if not self._ongoing_requests:
return
with self._ongoing_requests_lock:
- ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests))
+ ongoing_requests_uuids = set(self._ongoing_requests.keys())
waiting_requests = {}
expired_requests = {}
for request_uuid in ongoing_requests_uuids:
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py
index 78991ff..a71dbf3 100644
--- a/taskflow/engines/worker_based/protocol.py
+++ b/taskflow/engines/worker_based/protocol.py
@@ -25,7 +25,6 @@ import futurist
from oslo_serialization import jsonutils
from oslo_utils import reflection
from oslo_utils import timeutils
-import six
from taskflow.engines.action_engine import executor
from taskflow import exceptions as excp
@@ -148,8 +147,7 @@ def failure_to_dict(failure):
return failure.to_dict(include_args=False)
-@six.add_metaclass(abc.ABCMeta)
-class Message(object):
+class Message(object, metaclass=abc.ABCMeta):
"""Base class for all message types."""
def __repr__(self):
@@ -292,7 +290,7 @@ class Request(Message):
},
'action': {
"type": "string",
- "enum": list(six.iterkeys(ACTION_TO_EVENT)),
+ "enum": list(ACTION_TO_EVENT.keys()),
},
# Keyword arguments that end up in the revert() or execute()
# method of the remote task.
@@ -367,7 +365,7 @@ class Request(Message):
request['result'] = ('success', result)
if self._failures:
request['failures'] = {}
- for atom_name, failure in six.iteritems(self._failures):
+ for atom_name, failure in self._failures.items():
request['failures'][atom_name] = failure_to_dict(failure)
return request
@@ -431,7 +429,7 @@ class Request(Message):
# Validate all failure dictionaries that *may* be present...
failures = []
if 'failures' in data:
- failures.extend(six.itervalues(data['failures']))
+ failures.extend(data['failures'].values())
result = data.get('result')
if result is not None:
result_data_type, result_data = result
@@ -470,7 +468,7 @@ class Request(Message):
arguments['result'] = result_data
if failures is not None:
arguments['failures'] = {}
- for task, fail_data in six.iteritems(failures):
+ for task, fail_data in failures.items():
arguments['failures'][task] = ft.Failure.from_dict(fail_data)
return _WorkUnit(task_cls, task_name, action, arguments)
diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py
index e58c7a2..eff6cfb 100644
--- a/taskflow/engines/worker_based/proxy.py
+++ b/taskflow/engines/worker_based/proxy.py
@@ -19,7 +19,6 @@ import threading
import kombu
from kombu import exceptions as kombu_exceptions
-import six
from taskflow.engines.worker_based import dispatcher
from taskflow import logging
@@ -85,7 +84,7 @@ class Proxy(object):
ensure_options = self.DEFAULT_RETRY_OPTIONS.copy()
if retry_options is not None:
# Override the defaults with any user provided values...
- for k in set(six.iterkeys(ensure_options)):
+ for k in set(ensure_options.keys()):
if k in retry_options:
# Ensure that the right type is passed in...
val = retry_options[k]
@@ -154,7 +153,7 @@ class Proxy(object):
def publish(self, msg, routing_key, reply_to=None, correlation_id=None):
"""Publish message to the named exchange with given routing key."""
- if isinstance(routing_key, six.string_types):
+ if isinstance(routing_key, str):
routing_keys = [routing_key]
else:
routing_keys = routing_key
diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py
index b2334a3..bad803e 100644
--- a/taskflow/engines/worker_based/types.py
+++ b/taskflow/engines/worker_based/types.py
@@ -19,7 +19,6 @@ import threading
from oslo_utils import reflection
from oslo_utils import timeutils
-import six
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
@@ -39,7 +38,7 @@ class TopicWorker(object):
def __init__(self, topic, tasks, identity=_NO_IDENTITY):
self.tasks = []
for task in tasks:
- if not isinstance(task, six.string_types):
+ if not isinstance(task, str):
task = reflection.get_class_name(task)
self.tasks.append(task)
self.topic = topic
@@ -47,7 +46,7 @@ class TopicWorker(object):
self.last_seen = None
def performs(self, task):
- if not isinstance(task, six.string_types):
+ if not isinstance(task, str):
task = reflection.get_class_name(task)
return task in self.tasks
@@ -215,18 +214,18 @@ class ProxyWorkerFinder(object):
dead_workers = {}
with self._cond:
now = timeutils.now()
- for topic, worker in six.iteritems(self._workers):
+ for topic, worker in self._workers.items():
if worker.last_seen is None:
continue
secs_since_last_seen = max(0, now - worker.last_seen)
if secs_since_last_seen >= self._worker_expiry:
dead_workers[topic] = (worker, secs_since_last_seen)
- for topic in six.iterkeys(dead_workers):
+ for topic in dead_workers.keys():
self._workers.pop(topic)
if dead_workers:
self._cond.notify_all()
if dead_workers and LOG.isEnabledFor(logging.INFO):
- for worker, secs_since_last_seen in six.itervalues(dead_workers):
+ for worker, secs_since_last_seen in dead_workers.values():
LOG.info("Removed worker '%s' as it has not responded to"
" notification requests in %0.3f seconds",
worker, secs_since_last_seen)
@@ -245,7 +244,7 @@ class ProxyWorkerFinder(object):
"""Gets a worker that can perform a given task."""
available_workers = []
with self._cond:
- for worker in six.itervalues(self._workers):
+ for worker in self._workers.values():
if worker.performs(task):
available_workers.append(worker)
if available_workers:
diff --git a/taskflow/examples/example_utils.py b/taskflow/examples/example_utils.py
index be08e53..1c95ca7 100644
--- a/taskflow/examples/example_utils.py
+++ b/taskflow/examples/example_utils.py
@@ -21,7 +21,7 @@ import shutil
import sys
import tempfile
-from six.moves import urllib_parse
+from urllib import parse as urllib_parse
from taskflow import exceptions
from taskflow.persistence import backends
diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py
index 54983c4..a2553a9 100644
--- a/taskflow/examples/jobboard_produce_consume_colors.py
+++ b/taskflow/examples/jobboard_produce_consume_colors.py
@@ -30,8 +30,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
-import six
-from six.moves import range as compat_range
from zake import fake_client
from taskflow import exceptions as excp
@@ -139,7 +137,7 @@ def producer(ident, client):
name = "P-%s" % (ident)
safe_print(name, "started")
with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
- for i in compat_range(0, PRODUCER_UNITS):
+ for i in range(0, PRODUCER_UNITS):
job_name = "%s-%s" % (name, i)
details = {
'color': random.choice(['red', 'blue']),
@@ -151,22 +149,22 @@ def producer(ident, client):
def main():
- if six.PY3:
- # TODO(harlowja): Hack to make eventlet work right, remove when the
- # following is fixed: https://github.com/eventlet/eventlet/issues/230
- from taskflow.utils import eventlet_utils as _eu # noqa
- try:
- import eventlet as _eventlet # noqa
- except ImportError:
- pass
+ # TODO(harlowja): Hack to make eventlet work right, remove when the
+ # following is fixed: https://github.com/eventlet/eventlet/issues/230
+ from taskflow.utils import eventlet_utils as _eu # noqa
+ try:
+ import eventlet as _eventlet # noqa
+ except ImportError:
+ pass
+
with contextlib.closing(fake_client.FakeClient()) as c:
created = []
- for i in compat_range(0, PRODUCERS):
+ for i in range(0, PRODUCERS):
p = threading_utils.daemon_thread(producer, i + 1, c)
created.append(p)
p.start()
consumed = collections.deque()
- for i in compat_range(0, WORKERS):
+ for i in range(0, WORKERS):
w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
created.append(w)
w.start()
diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py
index 5cd8e9c..3eaa15e 100644
--- a/taskflow/examples/parallel_table_multiply.py
+++ b/taskflow/examples/parallel_table_multiply.py
@@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
import futurist
-from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
@@ -86,9 +85,9 @@ def main():
tbl = []
cols = random.randint(1, 100)
rows = random.randint(1, 100)
- for _i in compat_range(0, rows):
+ for _i in range(0, rows):
row = []
- for _j in compat_range(0, cols):
+ for _j in range(0, cols):
row.append(random.random())
tbl.append(row)
@@ -112,7 +111,7 @@ def main():
#
# TODO(harlowja): probably easier just to sort instead of search...
computed_tbl = []
- for i in compat_range(0, len(tbl)):
+ for i in range(0, len(tbl)):
for t in f:
if t.index == i:
computed_tbl.append(e.storage.get(t.name))
diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py
index 37087ec..061ba2e 100644
--- a/taskflow/examples/run_by_iter.py
+++ b/taskflow/examples/run_by_iter.py
@@ -18,8 +18,6 @@ import logging
import os
import sys
-import six
-
logging.basicConfig(level=logging.ERROR)
self_dir = os.path.abspath(os.path.dirname(__file__))
@@ -81,6 +79,6 @@ for f in flows:
while engine_iters:
for it in list(engine_iters):
try:
- print(six.next(it))
+ print(next(it))
except StopIteration:
engine_iters.remove(it)
diff --git a/taskflow/examples/share_engine_thread.py b/taskflow/examples/share_engine_thread.py
index 5223721..6372138 100644
--- a/taskflow/examples/share_engine_thread.py
+++ b/taskflow/examples/share_engine_thread.py
@@ -28,7 +28,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
import futurist
-import six
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
@@ -73,7 +72,7 @@ with futurist.ThreadPoolExecutor() as ex:
# and there is no more engine work to be done.
for it in cloned_iters:
try:
- six.next(it)
+ next(it)
except StopIteration:
try:
iters.remove(it)
diff --git a/taskflow/examples/simple_map_reduce.py b/taskflow/examples/simple_map_reduce.py
index 6476b48..bfaabde 100644
--- a/taskflow/examples/simple_map_reduce.py
+++ b/taskflow/examples/simple_map_reduce.py
@@ -33,8 +33,6 @@ sys.path.insert(0, self_dir)
# produced values and perform a final summation and this result will then be
# printed (and verified to ensure the calculation was as expected).
-import six
-
from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
@@ -51,7 +49,7 @@ class TotalReducer(task.Task):
def execute(self, *args, **kwargs):
# Reduces all mapped summed outputs into a single value.
total = 0
- for (k, v) in six.iteritems(kwargs):
+ for (k, v) in kwargs.items():
# If any other kwargs was passed in, we don't want to use those
# in the calculation of the total...
if k.startswith('reduction_'):
diff --git a/taskflow/examples/tox_conductor.py b/taskflow/examples/tox_conductor.py
index 66e575b..7490650 100644
--- a/taskflow/examples/tox_conductor.py
+++ b/taskflow/examples/tox_conductor.py
@@ -34,7 +34,6 @@ sys.path.insert(0, top_dir)
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from zake import fake_client
from taskflow.conductors import backends as conductors
@@ -114,7 +113,7 @@ def review_iter():
"""Makes reviews (never-ending iterator/generator)."""
review_id_gen = itertools.count(0)
while True:
- review_id = six.next(review_id_gen)
+ review_id = next(review_id_gen)
review = {
'id': review_id,
}
@@ -172,7 +171,7 @@ def generate_reviewer(client, saver, name=NAME):
review_generator = review_iter()
with contextlib.closing(jb):
while not no_more.is_set():
- review = six.next(review_generator)
+ review = next(review_generator)
details = {
'store': {
'review': review,
diff --git a/taskflow/examples/wbe_event_sender.py b/taskflow/examples/wbe_event_sender.py
index 9f9dbd8..2d72683 100644
--- a/taskflow/examples/wbe_event_sender.py
+++ b/taskflow/examples/wbe_event_sender.py
@@ -25,8 +25,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
-from six.moves import range as compat_range
-
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
@@ -124,7 +122,7 @@ if __name__ == "__main__":
try:
# Create a set of worker threads to simulate actual remote workers...
print('Running %s workers.' % (MEMORY_WORKERS))
- for i in compat_range(0, MEMORY_WORKERS):
+ for i in range(0, MEMORY_WORKERS):
# Give each one its own unique topic name so that they can
# correctly communicate with the engine (they will all share the
# same exchange).
diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py
index 48db5e6..bc954fe 100644
--- a/taskflow/examples/wbe_mandelbrot.py
+++ b/taskflow/examples/wbe_mandelbrot.py
@@ -24,8 +24,6 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
sys.path.insert(0, top_dir)
-from six.moves import range as compat_range
-
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
@@ -84,7 +82,7 @@ class MandelCalculator(task.Task):
def mandelbrot(x, y, max_iters):
c = complex(x, y)
z = 0.0j
- for i in compat_range(max_iters):
+ for i in range(max_iters):
z = z * z + c
if (z.real * z.real + z.imag * z.imag) >= 4:
return i
@@ -95,10 +93,10 @@ class MandelCalculator(task.Task):
pixel_size_x = (max_x - min_x) / width
pixel_size_y = (max_y - min_y) / height
block = []
- for y in compat_range(chunk[0], chunk[1]):
+ for y in range(chunk[0], chunk[1]):
row = []
imag = min_y + y * pixel_size_y
- for x in compat_range(0, width):
+ for x in range(0, width):
real = min_x + x * pixel_size_x
row.append(mandelbrot(real, imag, max_iters))
block.append(row)
@@ -133,7 +131,7 @@ def calculate(engine_conf):
# Compose our workflow.
height, _width = IMAGE_SIZE
chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
- for i in compat_range(0, CHUNK_COUNT):
+ for i in range(0, CHUNK_COUNT):
chunk_name = 'chunk_%s' % i
task_name = "calculation_%s" % i
# Break the calculation up into chunk size pieces.
@@ -225,7 +223,7 @@ def create_fractal():
try:
# Create a set of workers to simulate actual remote workers.
print('Running %s workers.' % (WORKERS))
- for i in compat_range(0, WORKERS):
+ for i in range(0, WORKERS):
worker_conf['topic'] = 'calculator_%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)
diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py
index fab32a9..0d9df10 100644
--- a/taskflow/exceptions.py
+++ b/taskflow/exceptions.py
@@ -14,13 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
+import io
import os
import traceback
from oslo_utils import excutils
from oslo_utils import reflection
-import six
-from taskflow.utils import mixins
def raise_with_cause(exc_cls, message, *args, **kwargs):
@@ -89,7 +88,7 @@ class TaskFlowException(Exception):
if indent < 0:
raise ValueError("Provided 'indent' must be greater than"
" or equal to zero instead of %s" % indent)
- buf = six.StringIO()
+ buf = io.StringIO()
if show_root_class:
buf.write(reflection.get_class_name(self, fully_qualified=False))
buf.write(": ")
@@ -244,7 +243,7 @@ class NotImplementedError(NotImplementedError):
"""
-class WrappedFailure(mixins.StrMixin, Exception):
+class WrappedFailure(Exception):
"""Wraps one or several failure objects.
When exception/s cannot be re-raised (for example, because the value and
@@ -298,17 +297,17 @@ class WrappedFailure(mixins.StrMixin, Exception):
return None
def __bytes__(self):
- buf = six.BytesIO()
+ buf = io.BytesIO()
buf.write(b'WrappedFailure: [')
- causes_gen = (six.binary_type(cause) for cause in self._causes)
+ causes_gen = (bytes(cause) for cause in self._causes)
buf.write(b", ".join(causes_gen))
buf.write(b']')
return buf.getvalue()
- def __unicode__(self):
- buf = six.StringIO()
+ def __str__(self):
+ buf = io.StringIO()
buf.write(u'WrappedFailure: [')
- causes_gen = (six.text_type(cause) for cause in self._causes)
+ causes_gen = (str(cause) for cause in self._causes)
buf.write(u", ".join(causes_gen))
buf.write(u']')
return buf.getvalue()
diff --git a/taskflow/flow.py b/taskflow/flow.py
index 3b974f7..8fac647 100644
--- a/taskflow/flow.py
+++ b/taskflow/flow.py
@@ -17,7 +17,6 @@
import abc
from oslo_utils import reflection
-import six
# Link metadata keys that have inherent/special meaning.
#
@@ -43,8 +42,7 @@ _CHOP_PAT_LEN = len(_CHOP_PAT)
LINK_DECIDER_DEPTH = 'decider_depth'
-@six.add_metaclass(abc.ABCMeta)
-class Flow(object):
+class Flow(object, metaclass=abc.ABCMeta):
"""The base abstract class of all flow implementations.
A flow is a structure that defines relationships between tasks. You can
@@ -60,7 +58,7 @@ class Flow(object):
"""
def __init__(self, name, retry=None):
- self._name = six.text_type(name)
+ self._name = str(name)
self._retry = retry
# NOTE(akarpinska): if retry doesn't have a name,
# the name of its owner will be assigned
diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py
index 8c1e511..30fa0f3 100644
--- a/taskflow/jobs/backends/impl_redis.py
+++ b/taskflow/jobs/backends/impl_redis.py
@@ -30,8 +30,6 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils
from redis import exceptions as redis_exceptions
from redis import sentinel
-import six
-from six.moves import range as compat_range
from taskflow import exceptions as exc
from taskflow.jobs import base
@@ -620,9 +618,9 @@ return cmsgpack.pack(result)
key_pieces = [key_piece]
if more_key_pieces:
key_pieces.extend(more_key_pieces)
- for i in compat_range(0, len(namespace_pieces)):
+ for i in range(0, len(namespace_pieces)):
namespace_pieces[i] = misc.binary_encode(namespace_pieces[i])
- for i in compat_range(0, len(key_pieces)):
+ for i in range(0, len(key_pieces)):
key_pieces[i] = misc.binary_encode(key_pieces[i])
namespace = b"".join(namespace_pieces)
key = self.KEY_PIECE_SEP.join(key_pieces)
@@ -696,7 +694,7 @@ return cmsgpack.pack(result)
'already_claimed': self.SCRIPT_ALREADY_CLAIMED,
}
prepared_scripts = {}
- for n, raw_script_tpl in six.iteritems(self.SCRIPT_TEMPLATES):
+ for n, raw_script_tpl in self.SCRIPT_TEMPLATES.items():
script_tpl = string.Template(raw_script_tpl)
script_blob = script_tpl.substitute(**script_params)
script = self._client.register_script(script_blob)
@@ -761,7 +759,7 @@ return cmsgpack.pack(result)
})
with _translate_failures():
raw_posting = self._dumps(posting)
- raw_job_uuid = six.b(job_uuid)
+ raw_job_uuid = job_uuid.encode('latin-1')
was_posted = bool(self._client.hsetnx(self.listings_key,
raw_job_uuid, raw_posting))
if not was_posted:
@@ -813,7 +811,7 @@ return cmsgpack.pack(result)
with _translate_failures():
raw_postings = self._client.hgetall(self.listings_key)
postings = []
- for raw_job_key, raw_posting in six.iteritems(raw_postings):
+ for raw_job_key, raw_posting in raw_postings.items():
try:
job_data = self._loads(raw_posting)
try:
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 6ee2222..fc9399a 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -30,7 +30,6 @@ from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from taskflow.conductors import base as c_base
from taskflow import exceptions as excp
@@ -373,7 +372,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if ensure_fresh:
self._force_refresh()
with self._job_cond:
- return sorted(six.itervalues(self._known_jobs))
+ return sorted(self._known_jobs.values())
def _force_refresh(self):
try:
@@ -479,7 +478,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths = []
pending_removals = []
with self._job_cond:
- for path in six.iterkeys(self._known_jobs):
+ for path in self._known_jobs.keys():
if path not in child_paths:
pending_removals.append(path)
for path in child_paths:
diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py
index 4e9bf1a..3bf5198 100644
--- a/taskflow/jobs/base.py
+++ b/taskflow/jobs/base.py
@@ -18,12 +18,12 @@
import abc
import collections
import contextlib
+import functools
import time
import enum
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from taskflow import exceptions as excp
from taskflow import states
@@ -105,8 +105,7 @@ class JobPriority(enum.Enum):
return tuple(values)
-@six.add_metaclass(abc.ABCMeta)
-class Job(object):
+class Job(object, metaclass=abc.ABCMeta):
"""A abstraction that represents a named and trackable unit of work.
A job connects a logbook, a owner, a priority, last modified and created
@@ -195,7 +194,7 @@ class Job(object):
return False
if self.state == states.COMPLETE:
return True
- sleepy_secs = six.next(delay_gen)
+ sleepy_secs = next(delay_gen)
if w is not None:
sleepy_secs = min(w.leftover(), sleepy_secs)
sleep_func(sleepy_secs)
@@ -269,7 +268,7 @@ class Job(object):
self.uuid, self.details)
-class JobBoardIterator(six.Iterator):
+class JobBoardIterator(object):
"""Iterator over a jobboard that iterates over potential jobs.
It provides the following attributes:
@@ -342,8 +341,7 @@ class JobBoardIterator(six.Iterator):
return job
-@six.add_metaclass(abc.ABCMeta)
-class JobBoard(object):
+class JobBoard(object, metaclass=abc.ABCMeta):
"""A place where jobs can be posted, reposted, claimed and transferred.
There can be multiple implementations of this job board, depending on the
@@ -559,9 +557,9 @@ class NotifyingJobBoard(JobBoard):
def check_who(meth):
- @six.wraps(meth)
+ @functools.wraps(meth)
def wrapper(self, job, who, *args, **kwargs):
- if not isinstance(who, six.string_types):
+ if not isinstance(who, str):
raise TypeError("Job applicant must be a string type")
if len(who) == 0:
raise ValueError("Job applicant must be non-empty")
diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py
index f5113a4..462a121 100644
--- a/taskflow/listeners/base.py
+++ b/taskflow/listeners/base.py
@@ -17,7 +17,6 @@
import abc
from oslo_utils import excutils
-import six
from taskflow import logging
from taskflow import states
@@ -161,8 +160,7 @@ class Listener(object):
self._engine, exc_info=True)
-@six.add_metaclass(abc.ABCMeta)
-class DumpingListener(Listener):
+class DumpingListener(Listener, metaclass=abc.ABCMeta):
"""Abstract base class for dumping listeners.
This provides a simple listener that can be attached to an engine which can
diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py
index dac74ce..000d4a4 100644
--- a/taskflow/listeners/claims.py
+++ b/taskflow/listeners/claims.py
@@ -17,8 +17,6 @@
import logging
import os
-import six
-
from taskflow import exceptions
from taskflow.listeners import base
from taskflow import states
@@ -58,7 +56,7 @@ class CheckingClaimListener(base.Listener):
if on_job_loss is None:
self._on_job_loss = self._suspend_engine_on_loss
else:
- if not six.callable(on_job_loss):
+ if not callable(on_job_loss):
raise ValueError("Custom 'on_job_loss' handler must be"
" callable")
self._on_job_loss = on_job_loss
diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py
index 8634ee0..587be8c 100644
--- a/taskflow/listeners/timing.py
+++ b/taskflow/listeners/timing.py
@@ -15,7 +15,6 @@
# under the License.
import itertools
-import six
import time
from oslo_utils import timeutils
@@ -58,7 +57,7 @@ class DurationListener(base.Listener):
super(DurationListener, self).deregister()
# There should be none that still exist at deregistering time, so log a
# warning if there were any that somehow still got left behind...
- for item_type, timers in six.iteritems(self._timers):
+ for item_type, timers in self._timers.items():
leftover_timers = len(timers)
if leftover_timers:
LOG.warning("%s %s(s) did not enter %s states",
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 52c8178..903e81f 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -15,8 +15,7 @@
# under the License.
import collections
-
-import six
+import functools
from taskflow import deciders as de
from taskflow import exceptions as exc
@@ -109,7 +108,7 @@ class Flow(flow.Flow):
if not self._graph.has_node(v):
raise ValueError("Node '%s' not found to link to" % (v))
if decider is not None:
- if not six.callable(decider):
+ if not callable(decider):
raise ValueError("Decider boolean callback must be callable")
self._swap(self._link(u, v, manual=True,
decider=decider, decider_depth=decider_depth))
@@ -316,7 +315,7 @@ class Flow(flow.Flow):
def _reset_cached_subgraph(func):
"""Resets cached subgraph after execution, in case it was affected."""
- @six.wraps(func)
+ @functools.wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self._subgraph = None
diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py
index 3b6f10c..d7fdef8 100644
--- a/taskflow/persistence/backends/impl_memory.py
+++ b/taskflow/persistence/backends/impl_memory.py
@@ -21,7 +21,6 @@ import itertools
import posixpath as pp
import fasteners
-import six
from taskflow import exceptions as exc
from taskflow.persistence import path_based
@@ -261,7 +260,7 @@ class FakeFilesystem(object):
if 'target' in node.metadata:
return "%s (link to %s)" % (node.item, node.metadata['target'])
else:
- return six.text_type(node.item)
+ return str(node.item)
def pformat(self):
"""Pretty format this in-memory filesystem."""
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index 2dbe6ff..3ac0f3d 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -22,7 +22,6 @@ import threading
import time
from oslo_utils import strutils
-import six
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import pool as sa_pool
@@ -116,7 +115,7 @@ def _log_statements(log_level, conn, cursor, statement, parameters, *args):
def _in_any(reason, err_haystack):
"""Checks if any elements of the haystack are in the given reason."""
for err in err_haystack:
- if reason.find(six.text_type(err)) != -1:
+ if reason.find(str(err)) != -1:
return True
return False
@@ -173,10 +172,10 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
- if _in_any(six.text_type(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS):
+ if _in_any(str(ex.args[0]), MY_SQL_GONE_WAY_AWAY_ERRORS):
LOG.warning('Got mysql server has gone away', exc_info=True)
raise sa_exc.DisconnectionError("Database server went away")
- elif _in_any(six.text_type(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS):
+ elif _in_any(str(ex.args[0]), POSTGRES_GONE_WAY_AWAY_ERRORS):
LOG.warning('Got postgres server has gone away', exc_info=True)
raise sa_exc.DisconnectionError("Database server went away")
else:
@@ -285,13 +284,13 @@ class SQLAlchemyBackend(base.Backend):
txn_isolation_levels = conf.pop('isolation_levels',
DEFAULT_TXN_ISOLATION_LEVELS)
level_applied = False
- for (driver, level) in six.iteritems(txn_isolation_levels):
+ for (driver, level) in txn_isolation_levels.items():
if driver == e_url.drivername:
engine_args['isolation_level'] = level
level_applied = True
break
if not level_applied:
- for (driver, level) in six.iteritems(txn_isolation_levels):
+ for (driver, level) in txn_isolation_levels.items():
if e_url.drivername.find(driver) != -1:
engine_args['isolation_level'] = level
break
@@ -362,7 +361,7 @@ class Connection(base.Connection):
def _retry_on_exception(exc):
LOG.warning("Engine connection (validate) failed due to '%s'", exc)
if isinstance(exc, sa_exc.OperationalError) and \
- _is_db_connection_error(six.text_type(exc.args[0])):
+ _is_db_connection_error(str(exc.args[0])):
# We may be able to fix this by retrying...
return True
if isinstance(exc, (sa_exc.TimeoutError,
diff --git a/taskflow/persistence/base.py b/taskflow/persistence/base.py
index 7f08c92..dc041f7 100644
--- a/taskflow/persistence/base.py
+++ b/taskflow/persistence/base.py
@@ -16,13 +16,10 @@
import abc
-import six
-
from taskflow.persistence import models
-@six.add_metaclass(abc.ABCMeta)
-class Backend(object):
+class Backend(object, metaclass=abc.ABCMeta):
"""Base class for persistence backends."""
def __init__(self, conf):
@@ -42,8 +39,7 @@ class Backend(object):
"""Closes any resources this backend has open."""
-@six.add_metaclass(abc.ABCMeta)
-class Connection(object):
+class Connection(object, metaclass=abc.ABCMeta):
"""Base class for backend connections."""
@abc.abstractproperty
diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py
index 0c3385a..8d3235d 100644
--- a/taskflow/persistence/models.py
+++ b/taskflow/persistence/models.py
@@ -21,7 +21,6 @@ import os
from oslo_utils import timeutils
from oslo_utils import uuidutils
-import six
from taskflow import exceptions as exc
from taskflow import states
@@ -259,7 +258,7 @@ class LogBook(object):
return self._name
def __iter__(self):
- for fd in six.itervalues(self._flowdetails_by_id):
+ for fd in self._flowdetails_by_id.values():
yield fd
def __len__(self):
@@ -464,15 +463,14 @@ class FlowDetail(object):
return self._name
def __iter__(self):
- for ad in six.itervalues(self._atomdetails_by_id):
+ for ad in self._atomdetails_by_id.values():
yield ad
def __len__(self):
return len(self._atomdetails_by_id)
-@six.add_metaclass(abc.ABCMeta)
-class AtomDetail(object):
+class AtomDetail(object, metaclass=abc.ABCMeta):
"""A collection of atom specific runtime information and metadata.
This is a base **abstract** class that contains attributes that are used
@@ -887,7 +885,7 @@ class RetryDetail(AtomDetail):
# contain tracebacks, which are not copyable.
for (data, failures) in self.results:
copied_failures = {}
- for (key, failure) in six.iteritems(failures):
+ for (key, failure) in failures.items():
copied_failures[key] = failure
results.append((data, copied_failures))
clone.results = results
@@ -980,7 +978,7 @@ class RetryDetail(AtomDetail):
new_results = []
for (data, failures) in results:
new_failures = {}
- for (key, data) in six.iteritems(failures):
+ for (key, data) in failures.items():
new_failures[key] = ft.Failure.from_dict(data)
new_results.append((data, new_failures))
return new_results
@@ -998,7 +996,7 @@ class RetryDetail(AtomDetail):
new_results = []
for (data, failures) in results:
new_failures = {}
- for (key, failure) in six.iteritems(failures):
+ for (key, failure) in failures.items():
new_failures[key] = failure.to_dict()
new_results.append((data, new_failures))
return new_results
@@ -1041,7 +1039,7 @@ class RetryDetail(AtomDetail):
# contain tracebacks, which are not copyable.
for (data, failures) in other.results:
copied_failures = {}
- for (key, failure) in six.iteritems(failures):
+ for (key, failure) in failures.items():
if deep_copy:
copied_failures[key] = failure.copy()
else:
@@ -1056,8 +1054,8 @@ _DETAIL_TO_NAME = {
TaskDetail: 'TASK_DETAIL',
}
_NAME_TO_DETAIL = dict((name, cls)
- for (cls, name) in six.iteritems(_DETAIL_TO_NAME))
-ATOM_TYPES = list(six.iterkeys(_NAME_TO_DETAIL))
+ for (cls, name) in _DETAIL_TO_NAME.items())
+ATOM_TYPES = list(_NAME_TO_DETAIL.keys())
def atom_detail_class(atom_type):
diff --git a/taskflow/persistence/path_based.py b/taskflow/persistence/path_based.py
index f2d411b..6fa3c05 100644
--- a/taskflow/persistence/path_based.py
+++ b/taskflow/persistence/path_based.py
@@ -15,15 +15,13 @@
# under the License.
import abc
-import six
from taskflow import exceptions as exc
from taskflow.persistence import base
from taskflow.persistence import models
-@six.add_metaclass(abc.ABCMeta)
-class PathBasedBackend(base.Backend):
+class PathBasedBackend(base.Backend, metaclass=abc.ABCMeta):
"""Base class for persistence backends that address data by path
Subclasses of this backend write logbooks, flow details, and atom details
@@ -48,8 +46,7 @@ class PathBasedBackend(base.Backend):
return self._path
-@six.add_metaclass(abc.ABCMeta)
-class PathBasedConnection(base.Connection):
+class PathBasedConnection(base.Connection, metaclass=abc.ABCMeta):
"""Base class for path based backend connections."""
def __init__(self, backend):
diff --git a/taskflow/retry.py b/taskflow/retry.py
index c019815..4935bbf 100644
--- a/taskflow/retry.py
+++ b/taskflow/retry.py
@@ -18,7 +18,6 @@
import abc
import enum
-import six
from taskflow import atom
from taskflow import exceptions as exc
@@ -100,7 +99,7 @@ class History(object):
self._contents[index],
]
for (provided, outcomes) in contents:
- for (owner, outcome) in six.iteritems(outcomes):
+ for (owner, outcome) in outcomes.items():
yield (owner, outcome)
def __len__(self):
@@ -136,8 +135,7 @@ class History(object):
return iter(self._contents)
-@six.add_metaclass(abc.ABCMeta)
-class Retry(atom.Atom):
+class Retry(atom.Atom, metaclass=abc.ABCMeta):
"""A class that can decide how to resolve execution failures.
This abstract base class is used to inherit from and provide different
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 7ddd7ef..b41b5df 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -20,7 +20,6 @@ import functools
import fasteners
from oslo_utils import reflection
from oslo_utils import uuidutils
-import six
import tenacity
from taskflow import exceptions
@@ -335,7 +334,7 @@ class Storage(object):
except exceptions.NotFound:
pass
else:
- names_iter = six.iterkeys(source.results)
+ names_iter = source.results.keys()
self._set_result_mapping(source.name,
dict((name, name) for name in names_iter))
@@ -628,7 +627,7 @@ class Storage(object):
result_mapping = self._result_mappings.get(atom_name)
if not result_mapping:
return
- for name, index in six.iteritems(result_mapping):
+ for name, index in result_mapping.items():
try:
_item_from(container, index)
except _EXTRACTION_EXCEPTIONS:
@@ -731,7 +730,7 @@ class Storage(object):
@fasteners.read_locked
def _get_failures(self, fail_cache_key):
failures = {}
- for atom_name, fail_cache in six.iteritems(self._failures):
+ for atom_name, fail_cache in self._failures.items():
try:
failures[atom_name] = fail_cache[fail_cache_key]
except KeyError:
@@ -771,7 +770,7 @@ class Storage(object):
@fasteners.read_locked
def has_failures(self):
"""Returns true if there are **any** failures in storage."""
- for fail_cache in six.itervalues(self._failures):
+ for fail_cache in self._failures.values():
if fail_cache:
return True
return False
@@ -898,11 +897,11 @@ class Storage(object):
clone.results.update(pairs)
result = self._with_connection(self._save_atom_detail,
source, clone)
- return (self.injector_name, six.iterkeys(result.results))
+ return (self.injector_name, result.results.keys())
def save_transient():
self._transients.update(pairs)
- return (_TRANSIENT_PROVIDER, six.iterkeys(self._transients))
+ return (_TRANSIENT_PROVIDER, self._transients.keys())
if transient:
provider_name, names = save_transient()
@@ -937,7 +936,7 @@ class Storage(object):
if mapping:
provider_mapping.update(mapping)
# Ensure the reverse mapping/index is updated (for faster lookups).
- for name, index in six.iteritems(provider_mapping):
+ for name, index in provider_mapping.items():
entries = self._reverse_mapping.setdefault(name, [])
provider = _Provider(provider_name, index)
if provider not in entries:
@@ -1002,13 +1001,13 @@ class Storage(object):
self._injected_args.get(atom_name, {}),
source.meta.get(META_INJECTED, {}),
]
- missing = set(six.iterkeys(args_mapping))
+ missing = set(args_mapping.keys())
locator = _ProviderLocator(
self._transients, self._fetch_providers,
lambda atom_name:
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE))
- for (bound_name, name) in six.iteritems(args_mapping):
+ for (bound_name, name) in args_mapping.items():
if LOG.isEnabledFor(logging.TRACE):
LOG.trace("Looking for %r <= %r for atom '%s'",
bound_name, name, atom_name)
@@ -1041,7 +1040,7 @@ class Storage(object):
if many_handler is None:
many_handler = _many_handler
results = {}
- for name in six.iterkeys(self._reverse_mapping):
+ for name in self._reverse_mapping.keys():
try:
results[name] = self.fetch(name, many_handler=many_handler)
except exceptions.NotFound:
@@ -1079,7 +1078,7 @@ class Storage(object):
self._get(atom_name, 'last_results', 'failure',
_EXECUTE_STATES_WITH_RESULTS, states.EXECUTE)
mapped_args = {}
- for (bound_name, name) in six.iteritems(args_mapping):
+ for (bound_name, name) in args_mapping.items():
if LOG.isEnabledFor(logging.TRACE):
if atom_name:
LOG.trace("Looking for %r <= %r for atom '%s'",
diff --git a/taskflow/task.py b/taskflow/task.py
index 3ff282d..fea62a1 100644
--- a/taskflow/task.py
+++ b/taskflow/task.py
@@ -18,11 +18,9 @@
import abc
import copy
+import functools
from oslo_utils import reflection
-import six
-from six.moves import map as compat_map
-from six.moves import reduce as compat_reduce
from taskflow import atom
from taskflow import logging
@@ -43,8 +41,7 @@ REVERT_FLOW_FAILURES = 'flow_failures'
EVENT_UPDATE_PROGRESS = 'update_progress'
-@six.add_metaclass(abc.ABCMeta)
-class Task(atom.Atom):
+class Task(atom.Atom, metaclass=abc.ABCMeta):
"""An abstraction that defines a potential piece of work.
This potential piece of work is expected to be able to contain
@@ -125,11 +122,11 @@ class FunctorTask(Task):
def __init__(self, execute, name=None, provides=None,
requires=None, auto_extract=True, rebind=None, revert=None,
version=None, inject=None):
- if not six.callable(execute):
+ if not callable(execute):
raise ValueError("Function to use for executing must be"
" callable")
if revert is not None:
- if not six.callable(revert):
+ if not callable(revert):
raise ValueError("Function to use for reverting must"
" be callable")
if name is None:
@@ -175,7 +172,7 @@ class ReduceFunctorTask(Task):
def __init__(self, functor, requires, name=None, provides=None,
auto_extract=True, rebind=None, inject=None):
- if not six.callable(functor):
+ if not callable(functor):
raise ValueError("Function to use for reduce must be callable")
f_args = reflection.get_callable_args(functor)
@@ -204,7 +201,7 @@ class ReduceFunctorTask(Task):
def execute(self, *args, **kwargs):
l = [kwargs[r] for r in self.requires]
- return compat_reduce(self._functor, l)
+ return functools.reduce(self._functor, l)
class MapFunctorTask(Task):
@@ -224,7 +221,7 @@ class MapFunctorTask(Task):
def __init__(self, functor, requires, name=None, provides=None,
auto_extract=True, rebind=None, inject=None):
- if not six.callable(functor):
+ if not callable(functor):
raise ValueError("Function to use for map must be callable")
f_args = reflection.get_callable_args(functor)
@@ -247,4 +244,4 @@ class MapFunctorTask(Task):
def execute(self, *args, **kwargs):
l = [kwargs[r] for r in self.requires]
- return list(compat_map(self._functor, l))
+ return list(map(self._functor, l))
diff --git a/taskflow/test.py b/taskflow/test.py
index a5d880a..8cd8e98 100644
--- a/taskflow/test.py
+++ b/taskflow/test.py
@@ -20,7 +20,6 @@ from unittest import mock
import fixtures
from oslotest import base
-import six
from testtools import compat
from testtools import matchers
@@ -105,7 +104,7 @@ class TestCase(base.BaseTestCase):
# Testtools seems to want equals objects instead of just keys?
compare_dict = {}
- for k in list(six.iterkeys(expected)):
+ for k in list(expected.keys()):
if not isinstance(expected[k], matchers.Equals):
compare_dict[k] = matchers.Equals(expected[k])
else:
diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py
index 6214290..bcb09e1 100644
--- a/taskflow/tests/test_examples.py
+++ b/taskflow/tests/test_examples.py
@@ -34,8 +34,6 @@ import re
import subprocess
import sys
-import six
-
from taskflow import test
ROOT_DIR = os.path.abspath(
@@ -118,8 +116,7 @@ class ExampleAdderMeta(type):
return type.__new__(cls, name, parents, dct)
-@six.add_metaclass(ExampleAdderMeta)
-class ExamplesTestCase(test.TestCase):
+class ExamplesTestCase(test.TestCase, metaclass=ExampleAdderMeta):
"""Runs the examples, and checks the outputs against expected outputs."""
def _check_example(self, name):
diff --git a/taskflow/tests/unit/action_engine/test_builder.py b/taskflow/tests/unit/action_engine/test_builder.py
index 1bb79b8..72b83ec 100644
--- a/taskflow/tests/unit/action_engine/test_builder.py
+++ b/taskflow/tests/unit/action_engine/test_builder.py
@@ -16,7 +16,6 @@
from automaton import exceptions as excp
from automaton import runners
-import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
@@ -70,30 +69,30 @@ class BuildersTest(test.TestCase):
flow, initial_state=st.RUNNING)
it = machine_runner.run_iter(builder.START)
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(st.RESUMING, new_state)
self.assertEqual(0, len(memory.failures))
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(st.SCHEDULING, new_state)
self.assertEqual(0, len(memory.failures))
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(st.WAITING, new_state)
self.assertEqual(0, len(memory.failures))
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(st.ANALYZING, new_state)
self.assertEqual(0, len(memory.failures))
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(builder.GAME_OVER, new_state)
self.assertEqual(0, len(memory.failures))
- prior_state, new_state = six.next(it)
+ prior_state, new_state = next(it)
self.assertEqual(st.SUCCESS, new_state)
self.assertEqual(0, len(memory.failures))
- self.assertRaises(StopIteration, six.next, it)
+ self.assertRaises(StopIteration, next, it)
def test_run_iterations_reverted(self):
flow = lf.Flow("root")
diff --git a/taskflow/tests/unit/jobs/test_redis_job.py b/taskflow/tests/unit/jobs/test_redis_job.py
index 9e6bdc7..320827c 100644
--- a/taskflow/tests/unit/jobs/test_redis_job.py
+++ b/taskflow/tests/unit/jobs/test_redis_job.py
@@ -18,7 +18,6 @@ import time
from unittest import mock
from oslo_utils import uuidutils
-import six
import testtools
from taskflow import exceptions as excp
@@ -44,7 +43,7 @@ class RedisJobboardTest(test.TestCase, base.BoardTestMixin):
namespace = uuidutils.generate_uuid()
client = ru.RedisClient()
config = {
- 'namespace': six.b("taskflow-%s" % namespace),
+ 'namespace': ("taskflow-%s" % namespace).encode('latin-1'),
}
kwargs = {
'client': client,
diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py
index 527f253..d93f148 100644
--- a/taskflow/tests/unit/jobs/test_zk_job.py
+++ b/taskflow/tests/unit/jobs/test_zk_job.py
@@ -21,7 +21,6 @@ from kazoo.protocol import paths as k_paths
from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
-import six
import testtools
from zake import fake_client
from zake import utils as zake_utils
@@ -171,7 +170,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
# Forcefully delete the owner from the backend storage to make
# sure the job becomes unclaimed (this may happen if some admin
# manually deletes the lock).
- paths = list(six.iteritems(self.client.storage.paths))
+ paths = list(self.client.storage.paths.items())
for (path, value) in paths:
if path in self.bad_paths:
continue
@@ -192,7 +191,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
# Forcefully delete the lock from the backend storage to make
# sure the job becomes unclaimed (this may happen if some admin
# manually deletes the lock).
- paths = list(six.iteritems(self.client.storage.paths))
+ paths = list(self.client.storage.paths.items())
for (path, value) in paths:
if path in self.bad_paths:
continue
@@ -215,7 +214,7 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
trashed = []
jobs = []
- paths = list(six.iteritems(self.client.storage.paths))
+ paths = list(self.client.storage.paths.items())
for (path, value) in paths:
if path in self.bad_paths:
continue
@@ -244,14 +243,14 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
# Remove paths that got created due to the running process that we are
# not interested in...
paths = {}
- for (path, data) in six.iteritems(self.client.storage.paths):
+ for (path, data) in self.client.storage.paths.items():
if path in self.bad_paths:
continue
paths[path] = data
# Check the actual data that was posted.
self.assertEqual(1, len(paths))
- path_key = list(six.iterkeys(paths))[0]
+ path_key = list(paths.keys())[0]
self.assertTrue(len(paths[path_key]['data']) > 0)
self.assertDictEqual({
'uuid': posted_job.uuid,
diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py
index 973bf9f..5a1917c 100644
--- a/taskflow/tests/unit/persistence/test_sql_persistence.py
+++ b/taskflow/tests/unit/persistence/test_sql_persistence.py
@@ -20,7 +20,6 @@ import os
import random
import tempfile
-import six
import testtools
@@ -123,8 +122,8 @@ class SqlitePersistenceTest(test.TestCase, base.PersistenceTestMixin):
self.db_location = None
-@six.add_metaclass(abc.ABCMeta)
-class BackendPersistenceTestMixin(base.PersistenceTestMixin):
+class BackendPersistenceTestMixin(base.PersistenceTestMixin,
+ metaclass=abc.ABCMeta):
"""Specifies a backend type and does required setup and teardown."""
def _get_connection(self):
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 81f0e08..4454429 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -20,7 +20,6 @@ import functools
import threading
import futurist
-import six
import testtools
import taskflow.engines
@@ -350,7 +349,7 @@ class EngineLinearFlowTest(utils.EngineTestBase):
engine_it = engine.run_iter()
while True:
try:
- engine_state = six.next(engine_it)
+ engine_state = next(engine_it)
if engine_state not in engine_states:
engine_states[engine_state] = 1
else:
@@ -1318,7 +1317,7 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase):
def test_graph_flow_conditional_history(self):
def even_odd_decider(history, allowed):
- total = sum(six.itervalues(history))
+ total = sum(history.values())
if total == allowed:
return True
return False
diff --git a/taskflow/tests/unit/test_exceptions.py b/taskflow/tests/unit/test_exceptions.py
index c542ae8..4c20f7c 100644
--- a/taskflow/tests/unit/test_exceptions.py
+++ b/taskflow/tests/unit/test_exceptions.py
@@ -16,9 +16,6 @@
import string
-import six
-import testtools
-
from taskflow import exceptions as exc
from taskflow import test
@@ -109,7 +106,6 @@ class TestExceptions(test.TestCase):
ex = exc.TaskFlowException("Broken")
self.assertRaises(ValueError, ex.pformat, indent=-100)
- @testtools.skipIf(not six.PY3, 'py3.x is not available')
def test_raise_with_cause(self):
capture = None
try:
diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py
index 217f1b3..1c25ac7 100644
--- a/taskflow/tests/unit/test_failure.py
+++ b/taskflow/tests/unit/test_failure.py
@@ -14,12 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import pickle
import sys
from oslo_utils import encodeutils
-import six
-from six.moves import cPickle as pickle
-import testtools
from taskflow import exceptions
from taskflow import test
@@ -338,23 +336,16 @@ class NonAsciiExceptionsTestCase(test.TestCase):
fail = failure.Failure.from_exception(excp)
self.assertEqual(encodeutils.exception_to_unicode(excp),
fail.exception_str)
- # This is slightly different on py2 vs py3... due to how
- # __str__ or __unicode__ is called and what is expected from
- # both...
- if six.PY2:
- msg = encodeutils.exception_to_unicode(excp)
- expected = 'Failure: ValueError: %s' % msg.encode('utf-8')
- else:
- expected = u'Failure: ValueError: \xc8'
+ expected = u'Failure: ValueError: \xc8'
self.assertEqual(expected, str(fail))
def test_exception_non_ascii_unicode(self):
hi_ru = u'привет'
fail = failure.Failure.from_exception(ValueError(hi_ru))
self.assertEqual(hi_ru, fail.exception_str)
- self.assertIsInstance(fail.exception_str, six.text_type)
+ self.assertIsInstance(fail.exception_str, str)
self.assertEqual(u'Failure: ValueError: %s' % hi_ru,
- six.text_type(fail))
+ str(fail))
def test_wrapped_failure_non_ascii_unicode(self):
hi_cn = u'嗨'
@@ -364,7 +355,7 @@ class NonAsciiExceptionsTestCase(test.TestCase):
wrapped_fail = exceptions.WrappedFailure([fail])
expected_result = (u"WrappedFailure: "
"[Failure: ValueError: %s]" % (hi_cn))
- self.assertEqual(expected_result, six.text_type(wrapped_fail))
+ self.assertEqual(expected_result, str(wrapped_fail))
def test_failure_equality_with_non_ascii_str(self):
bad_string = chr(200)
@@ -379,7 +370,6 @@ class NonAsciiExceptionsTestCase(test.TestCase):
self.assertEqual(fail, copied)
-@testtools.skipIf(not six.PY3, 'this test only works on python 3.x')
class FailureCausesTest(test.TestCase):
@classmethod
@@ -392,7 +382,7 @@ class FailureCausesTest(test.TestCase):
cls._raise_many(messages)
raise e
except RuntimeError as e1:
- six.raise_from(e, e1)
+ raise e from e1
def test_causes(self):
f = None
@@ -467,7 +457,7 @@ class FailureCausesTest(test.TestCase):
self._raise_many(["Still still not working",
"Still not working", "Not working"])
except RuntimeError as e:
- six.raise_from(e, None)
+ raise e from None
except RuntimeError:
f = failure.Failure()
diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py
index 3e2f880..b14f7f2 100644
--- a/taskflow/tests/unit/test_listeners.py
+++ b/taskflow/tests/unit/test_listeners.py
@@ -21,7 +21,6 @@ import time
from oslo_serialization import jsonutils
from oslo_utils import reflection
-import six
from zake import fake_client
import taskflow.engines
@@ -111,7 +110,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
children = self.client.storage.get_children("/taskflow",
only_direct=False)
removed = 0
- for p, data in six.iteritems(children):
+ for p, data in children.items():
if p.endswith(".lock"):
self.client.storage.pop(p)
removed += 1
@@ -121,7 +120,7 @@ class TestClaimListener(test.TestCase, EngineMakerMixin):
children = self.client.storage.get_children("/taskflow",
only_direct=False)
altered = 0
- for p, data in six.iteritems(children):
+ for p, data in children.items():
if p.endswith(".lock"):
self.client.set(p, misc.binary_encode(
jsonutils.dumps({'owner': new_owner})))
diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py
index fd97cd6..eca26cd 100644
--- a/taskflow/tests/unit/test_types.py
+++ b/taskflow/tests/unit/test_types.py
@@ -15,7 +15,7 @@
# under the License.
import networkx as nx
-from six.moves import cPickle as pickle
+import pickle
from taskflow import test
from taskflow.types import graph
diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py
index 72bddd2..168e517 100644
--- a/taskflow/tests/unit/test_utils.py
+++ b/taskflow/tests/unit/test_utils.py
@@ -20,7 +20,6 @@ import random
import string
import time
-import six
import testscenarios
from taskflow import test
@@ -241,7 +240,7 @@ class TestCountdownIter(test.TestCase):
def test_invalid_decr(self):
it = misc.countdown_iter(10, -1)
- self.assertRaises(ValueError, six.next, it)
+ self.assertRaises(ValueError, next, it)
class TestMergeUri(test.TestCase):
diff --git a/taskflow/tests/unit/test_utils_binary.py b/taskflow/tests/unit/test_utils_binary.py
index 773f389..0ba930f 100644
--- a/taskflow/tests/unit/test_utils_binary.py
+++ b/taskflow/tests/unit/test_utils_binary.py
@@ -14,24 +14,19 @@
# License for the specific language governing permissions and limitations
# under the License.
-import six
-
from taskflow import test
from taskflow.utils import misc
def _bytes(data):
- if six.PY3:
- return data.encode(encoding='utf-8')
- else:
- return data
+ return data.encode(encoding='utf-8')
class BinaryEncodeTest(test.TestCase):
def _check(self, data, expected_result):
result = misc.binary_encode(data)
- self.assertIsInstance(result, six.binary_type)
+ self.assertIsInstance(result, bytes)
self.assertEqual(expected_result, result)
def test_simple_binary(self):
@@ -50,7 +45,7 @@ class BinaryEncodeTest(test.TestCase):
def test_unicode_other_encoding(self):
result = misc.binary_encode(u'mañana', 'latin-1')
- self.assertIsInstance(result, six.binary_type)
+ self.assertIsInstance(result, bytes)
self.assertEqual(u'mañana'.encode('latin-1'), result)
@@ -58,7 +53,7 @@ class BinaryDecodeTest(test.TestCase):
def _check(self, data, expected_result):
result = misc.binary_decode(data)
- self.assertIsInstance(result, six.text_type)
+ self.assertIsInstance(result, str)
self.assertEqual(expected_result, result)
def test_simple_text(self):
@@ -78,7 +73,7 @@ class BinaryDecodeTest(test.TestCase):
def test_unicode_other_encoding(self):
data = u'mañana'.encode('latin-1')
result = misc.binary_decode(data, 'latin-1')
- self.assertIsInstance(result, six.text_type)
+ self.assertIsInstance(result, str)
self.assertEqual(u'mañana', result)
@@ -94,7 +89,7 @@ class DecodeJsonTest(test.TestCase):
def test_handles_invalid_unicode(self):
self.assertRaises(ValueError, misc.decode_json,
- six.b('{"\xf1": 1}'))
+ '{"\xf1": 1}'.encode('latin-1'))
def test_handles_bad_json(self):
self.assertRaises(ValueError, misc.decode_json,
diff --git a/taskflow/tests/unit/test_utils_iter_utils.py b/taskflow/tests/unit/test_utils_iter_utils.py
index 4a5ff4b..c4109d8 100644
--- a/taskflow/tests/unit/test_utils_iter_utils.py
+++ b/taskflow/tests/unit/test_utils_iter_utils.py
@@ -16,9 +16,6 @@
import string
-import six
-from six.moves import range as compat_range
-
from taskflow import test
from taskflow.utils import iter_utils
@@ -46,22 +43,22 @@ class IterUtilsTest(test.TestCase):
def test_generate_delays(self):
it = iter_utils.generate_delays(1, 60)
- self.assertEqual(1, six.next(it))
- self.assertEqual(2, six.next(it))
- self.assertEqual(4, six.next(it))
- self.assertEqual(8, six.next(it))
- self.assertEqual(16, six.next(it))
- self.assertEqual(32, six.next(it))
- self.assertEqual(60, six.next(it))
- self.assertEqual(60, six.next(it))
+ self.assertEqual(1, next(it))
+ self.assertEqual(2, next(it))
+ self.assertEqual(4, next(it))
+ self.assertEqual(8, next(it))
+ self.assertEqual(16, next(it))
+ self.assertEqual(32, next(it))
+ self.assertEqual(60, next(it))
+ self.assertEqual(60, next(it))
def test_generate_delays_custom_multiplier(self):
it = iter_utils.generate_delays(1, 60, multiplier=4)
- self.assertEqual(1, six.next(it))
- self.assertEqual(4, six.next(it))
- self.assertEqual(16, six.next(it))
- self.assertEqual(60, six.next(it))
- self.assertEqual(60, six.next(it))
+ self.assertEqual(1, next(it))
+ self.assertEqual(4, next(it))
+ self.assertEqual(16, next(it))
+ self.assertEqual(60, next(it))
+ self.assertEqual(60, next(it))
def test_generate_delays_bad(self):
self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1)
@@ -99,7 +96,7 @@ class IterUtilsTest(test.TestCase):
self.assertRaises(ValueError, iter_utils.fill, 2, 2)
def test_fill_many_empty(self):
- result = list(iter_utils.fill(compat_range(0, 50), 500))
+ result = list(iter_utils.fill(range(0, 50), 500))
self.assertEqual(450, sum(1 for x in result if x is None))
self.assertEqual(50, sum(1 for x in result if x is not None))
@@ -134,10 +131,10 @@ class IterUtilsTest(test.TestCase):
def test_count(self):
self.assertEqual(0, iter_utils.count([]))
self.assertEqual(1, iter_utils.count(['a']))
- self.assertEqual(10, iter_utils.count(compat_range(0, 10)))
- self.assertEqual(1000, iter_utils.count(compat_range(0, 1000)))
- self.assertEqual(0, iter_utils.count(compat_range(0)))
- self.assertEqual(0, iter_utils.count(compat_range(-1)))
+ self.assertEqual(10, iter_utils.count(range(0, 10)))
+ self.assertEqual(1000, iter_utils.count(range(0, 1000)))
+ self.assertEqual(0, iter_utils.count(range(0)))
+ self.assertEqual(0, iter_utils.count(range(-1)))
def test_bad_while_is_not(self):
self.assertRaises(ValueError, iter_utils.while_is_not, 2, 'a')
diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py
index 5380204..2261628 100644
--- a/taskflow/tests/unit/worker_based/test_server.py
+++ b/taskflow/tests/unit/worker_based/test_server.py
@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import six
-
from taskflow.engines.worker_based import endpoint as ep
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import server
@@ -143,7 +141,7 @@ class TestServer(test.MockTestCase):
(self.task.name, self.task.name, 'revert',
dict(arguments=self.task_args,
failures=dict((i, utils.FailureMatcher(f))
- for i, f in six.iteritems(failures)))),
+ for i, f in failures.items()))),
(task_cls, task_name, action, task_args))
@mock.patch("taskflow.engines.worker_based.server.LOG.critical")
diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py
index 0e49a56..3b068a0 100644
--- a/taskflow/tests/unit/worker_based/test_worker.py
+++ b/taskflow/tests/unit/worker_based/test_worker.py
@@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import io
from oslo_utils import reflection
-import six
from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import worker
@@ -66,7 +66,7 @@ class TestWorker(test.MockTestCase):
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
def test_banner_writing(self):
- buf = six.StringIO()
+ buf = io.StringIO()
w = self.worker()
w.run(banner_writer=buf.write)
w.wait()
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 58cd9ab..6c2f13a 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -21,7 +21,6 @@ import time
from oslo_utils import timeutils
import redis
-import six
from taskflow import exceptions
from taskflow.listeners import capturing
@@ -137,7 +136,7 @@ class GiveBackRevert(task.Task):
result = kwargs.get('result')
# If this somehow fails, timeout, or other don't send back a
# valid result...
- if isinstance(result, six.integer_types):
+ if isinstance(result, int):
return result + 1
@@ -153,12 +152,8 @@ class LongArgNameTask(task.Task):
return long_arg_name
-if six.PY3:
- RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
- 'BaseException', 'object']
-else:
- RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
- 'BaseException', 'object']
+RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception', 'BaseException',
+ 'object']
class ProvidesRequiresTask(task.Task):
@@ -410,11 +405,11 @@ class WaitForOneFromTask(ProgressingTask):
def __init__(self, name, wait_for, wait_states, **kwargs):
super(WaitForOneFromTask, self).__init__(name, **kwargs)
- if isinstance(wait_for, six.string_types):
+ if isinstance(wait_for, str):
self.wait_for = [wait_for]
else:
self.wait_for = wait_for
- if isinstance(wait_states, six.string_types):
+ if isinstance(wait_states, str):
self.wait_states = [wait_states]
else:
self.wait_states = wait_states
diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py
index fc35bc9..d5e9df1 100644
--- a/taskflow/types/failure.py
+++ b/taskflow/types/failure.py
@@ -16,17 +16,16 @@
import collections
import copy
+import io
import os
import sys
import traceback
from oslo_utils import encodeutils
from oslo_utils import reflection
-import six
from taskflow import exceptions as exc
from taskflow.utils import iter_utils
-from taskflow.utils import mixins
from taskflow.utils import schema_utils as su
@@ -68,7 +67,7 @@ def _are_equal_exc_info_tuples(ei1, ei2):
return tb1 == tb2
-class Failure(mixins.StrMixin):
+class Failure():
"""An immutable object that represents failure.
Failure objects encapsulate exception information so that they can be
@@ -210,7 +209,7 @@ class Failure(mixins.StrMixin):
if kwargs:
raise TypeError(
'Failure.__init__ got unexpected keyword argument(s): %s'
- % ', '.join(six.iterkeys(kwargs)))
+ % ', '.join(kwargs.keys()))
@classmethod
def from_exception(cls, exception):
@@ -343,7 +342,12 @@ class Failure(mixins.StrMixin):
def reraise(self):
"""Re-raise captured exception."""
if self._exc_info:
- six.reraise(*self._exc_info)
+ tp, value, tb = self._exc_info
+ if value is None:
+ value = tp()
+ if value.__traceback__ is not tb:
+ raise value.with_traceback(tb)
+ raise value
else:
raise exc.WrappedFailure([self])
@@ -424,12 +428,12 @@ class Failure(mixins.StrMixin):
self._causes = tuple(self._extract_causes_iter(self.exception))
return self._causes
- def __unicode__(self):
+ def __str__(self):
return self.pformat()
def pformat(self, traceback=False):
"""Pretty formats the failure object into a string."""
- buf = six.StringIO()
+ buf = io.StringIO()
if not self._exc_type_names:
buf.write('Failure: %s' % (self._exception_str))
else:
diff --git a/taskflow/types/graph.py b/taskflow/types/graph.py
index ad518e9..5d69127 100644
--- a/taskflow/types/graph.py
+++ b/taskflow/types/graph.py
@@ -15,11 +15,11 @@
# under the License.
import collections
+import io
import os
import networkx as nx
from networkx.drawing import nx_pydot
-import six
def _common_format(g, edge_notation):
@@ -128,7 +128,7 @@ class DiGraph(nx.DiGraph):
cycles = list(nx.cycles.recursive_simple_cycles(self))
lines.append("Cycles: %s" % len(cycles))
for cycle in cycles:
- buf = six.StringIO()
+ buf = io.StringIO()
buf.write("%s" % (cycle[0]))
for i in range(1, len(cycle)):
buf.write(" --> %s" % (cycle[i]))
@@ -251,7 +251,7 @@ def merge_graphs(graph, *graphs, **kwargs):
tmp_graph = graph
allow_overlaps = kwargs.get('allow_overlaps', False)
overlap_detector = kwargs.get('overlap_detector')
- if overlap_detector is not None and not six.callable(overlap_detector):
+ if overlap_detector is not None and not callable(overlap_detector):
raise ValueError("Overlap detection callback expected to be callable")
elif overlap_detector is None:
overlap_detector = (lambda to_graph, from_graph:
diff --git a/taskflow/types/notifier.py b/taskflow/types/notifier.py
index 7c6af37..f10637e 100644
--- a/taskflow/types/notifier.py
+++ b/taskflow/types/notifier.py
@@ -20,7 +20,6 @@ import copy
import logging
from oslo_utils import reflection
-import six
LOG = logging.getLogger(__name__)
@@ -164,7 +163,7 @@ class Notifier(object):
:rtype: number
"""
count = 0
- for (_event_type, listeners) in six.iteritems(self._topics):
+ for (_event_type, listeners) in self._topics.items():
count += len(listeners)
return count
@@ -235,10 +234,10 @@ class Notifier(object):
:param kwargs: key-value pair arguments
:type kwargs: dictionary
"""
- if not six.callable(callback):
+ if not callable(callback):
raise ValueError("Event callback must be callable")
if details_filter is not None:
- if not six.callable(details_filter):
+ if not callable(details_filter):
raise ValueError("Details filter must be callable")
if not self.can_be_registered(event_type):
raise ValueError("Disallowed event type '%s' can not have a"
@@ -280,7 +279,7 @@ class Notifier(object):
def copy(self):
c = copy.copy(self)
c._topics = collections.defaultdict(list)
- for (event_type, listeners) in six.iteritems(self._topics):
+ for (event_type, listeners) in self._topics.items():
c._topics[event_type] = listeners[:]
return c
@@ -292,7 +291,7 @@ class Notifier(object):
itself wraps a provided callback (and its details filter
callback, if any).
"""
- for event_type, listeners in six.iteritems(self._topics):
+ for event_type, listeners in self._topics.items():
if listeners:
yield (event_type, listeners)
diff --git a/taskflow/types/sets.py b/taskflow/types/sets.py
index 1a33ed3..805f54b 100644
--- a/taskflow/types/sets.py
+++ b/taskflow/types/sets.py
@@ -18,8 +18,6 @@ import collections
from collections import abc
import itertools
-import six
-
# Used for values that don't matter in sets backed by dicts...
_sentinel = object()
@@ -59,7 +57,7 @@ class OrderedSet(abc.Set, abc.Hashable):
return len(self._data)
def __iter__(self):
- for value in six.iterkeys(self._data):
+ for value in self._data.keys():
yield value
def __setstate__(self, items):
diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py
index a160f6e..cdab1cd 100644
--- a/taskflow/types/timing.py
+++ b/taskflow/types/timing.py
@@ -16,8 +16,6 @@
import threading
-import six
-
class Timeout(object):
"""An object which represents a timeout.
@@ -62,7 +60,7 @@ def convert_to_timeout(value=None, default_value=None,
"""
if value is None:
value = default_value
- if isinstance(value, (int, float) + six.string_types):
+ if isinstance(value, (int, float, str)):
return Timeout(float(value), event_factory=event_factory)
elif isinstance(value, Timeout):
return value
diff --git a/taskflow/types/tree.py b/taskflow/types/tree.py
index 3681694..2aef2a1 100644
--- a/taskflow/types/tree.py
+++ b/taskflow/types/tree.py
@@ -15,11 +15,10 @@
# under the License.
import collections
+import io
import itertools
import os
-import six
-
from taskflow.types import graph
from taskflow.utils import iter_utils
from taskflow.utils import misc
@@ -279,9 +278,9 @@ class Node(object):
"""
if stringify_node is None:
# Default to making a unicode string out of the nodes item...
- stringify_node = lambda node: six.text_type(node.item)
+ stringify_node = lambda node: str(node.item)
expected_lines = self.child_count(only_direct=False) + 1
- buff = six.StringIO()
+ buff = io.StringIO()
conn = vertical_conn + horizontal_conn
stop_at_parent = self
for i, node in enumerate(self.dfs_iter(include_self=True), 1):
diff --git a/taskflow/utils/banner.py b/taskflow/utils/banner.py
index a40eea6..0c2adb3 100644
--- a/taskflow/utils/banner.py
+++ b/taskflow/utils/banner.py
@@ -17,8 +17,6 @@
import os
import string
-import six
-
from taskflow.utils import misc
from taskflow import version
@@ -62,7 +60,7 @@ def make_banner(what, chapters):
buf.write_nl(BANNER_HEADER)
if chapters:
buf.write_nl("*%s*" % what)
- chapter_names = sorted(six.iterkeys(chapters))
+ chapter_names = sorted(chapters.keys())
else:
buf.write("*%s*" % what)
chapter_names = []
@@ -73,7 +71,7 @@ def make_banner(what, chapters):
else:
buf.write("%s:" % (chapter_name))
if isinstance(chapter_contents, dict):
- section_names = sorted(six.iterkeys(chapter_contents))
+ section_names = sorted(chapter_contents.keys())
for j, section_name in enumerate(section_names):
if j + 1 < len(section_names):
buf.write_nl(" %s => %s"
diff --git a/taskflow/utils/iter_utils.py b/taskflow/utils/iter_utils.py
index 8591d23..ebf4107 100644
--- a/taskflow/utils/iter_utils.py
+++ b/taskflow/utils/iter_utils.py
@@ -15,15 +15,13 @@
# under the License.
from collections import abc
+import functools
import itertools
-import six
-from six.moves import range as compat_range
-
def _ensure_iterable(func):
- @six.wraps(func)
+ @functools.wraps(func)
def wrapper(it, *args, **kwargs):
if not isinstance(it, abc.Iterable):
raise ValueError("Iterable expected, but '%s' is not"
@@ -147,5 +145,5 @@ def iter_forever(limit):
while True:
yield next(i)
else:
- for i in compat_range(0, limit):
+ for i in range(0, limit):
yield i
diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py
index 2d856bd..505c101 100644
--- a/taskflow/utils/kazoo_utils.py
+++ b/taskflow/utils/kazoo_utils.py
@@ -17,8 +17,6 @@
from kazoo import client
from kazoo import exceptions as k_exc
from oslo_utils import reflection
-import six
-from six.moves import zip as compat_zip
from taskflow import exceptions as exc
from taskflow import logging
@@ -28,11 +26,11 @@ LOG = logging.getLogger(__name__)
def _parse_hosts(hosts):
- if isinstance(hosts, six.string_types):
+ if isinstance(hosts, str):
return hosts.strip()
if isinstance(hosts, (dict)):
host_ports = []
- for (k, v) in six.iteritems(hosts):
+ for (k, v) in hosts.items():
host_ports.append("%s:%s" % (k, v))
hosts = host_ports
if isinstance(hosts, (list, set, tuple)):
@@ -89,7 +87,7 @@ def checked_commit(txn):
return []
results = txn.commit()
failures = []
- for op, result in compat_zip(txn.operations, results):
+ for op, result in zip(txn.operations, results):
if isinstance(result, k_exc.KazooException):
failures.append((op, result))
if len(results) < len(txn.operations):
@@ -211,7 +209,7 @@ def make_client(conf):
if 'connection_retry' in conf:
client_kwargs['connection_retry'] = conf['connection_retry']
hosts = _parse_hosts(conf.get("hosts", "localhost:2181"))
- if not hosts or not isinstance(hosts, six.string_types):
+ if not hosts or not isinstance(hosts, str):
raise TypeError("Invalid hosts format, expected "
"non-empty string/list, not '%s' (%s)"
% (hosts, type(hosts)))
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index 65a93e2..8c0f904 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -18,7 +18,9 @@
import collections.abc
import contextlib
import datetime
+import functools
import inspect
+import io
import os
import re
import socket
@@ -33,13 +35,12 @@ from oslo_utils import encodeutils
from oslo_utils import importutils
from oslo_utils import netutils
from oslo_utils import reflection
-import six
from taskflow.types import failure
UNKNOWN_HOSTNAME = "<unknown>"
-NUMERIC_TYPES = six.integer_types + (float,)
+NUMERIC_TYPES = (int, float)
# NOTE(imelnikov): regular expression to get scheme from URI,
# see RFC 3986 section 3.1
@@ -57,7 +58,7 @@ class StrEnum(str, enum.Enum):
return super(StrEnum, cls).__new__(cls, *args, **kwargs)
-class StringIO(six.StringIO):
+class StringIO(io.StringIO):
"""String buffer with some small additions."""
def write_nl(self, value, linesep=os.linesep):
@@ -65,7 +66,7 @@ class StringIO(six.StringIO):
self.write(linesep)
-class BytesIO(six.BytesIO):
+class BytesIO(io.BytesIO):
"""Byte buffer with some small additions."""
def reset(self):
@@ -118,7 +119,7 @@ def countdown_iter(start_at, decr=1):
def extract_driver_and_conf(conf, conf_key):
"""Common function to get a driver name and its configuration."""
- if isinstance(conf, six.string_types):
+ if isinstance(conf, str):
conf = {conf_key: conf}
maybe_uri = conf[conf_key]
try:
@@ -161,7 +162,7 @@ def merge_uri(uri, conf):
for (k, v, is_not_empty_value_func) in specials:
if is_not_empty_value_func(v):
conf.setdefault(k, v)
- for (k, v) in six.iteritems(uri.params()):
+ for (k, v) in uri.params().items():
conf.setdefault(k, v)
return conf
@@ -182,7 +183,7 @@ def find_subclasses(locations, base_cls, exclude_hidden=True):
derived = set()
for item in locations:
module = None
- if isinstance(item, six.string_types):
+ if isinstance(item, str):
try:
pkg, cls = item.split(':')
except ValueError:
@@ -221,7 +222,7 @@ def pick_first_not_none(*values):
def parse_uri(uri):
"""Parses a uri into its components."""
# Do some basic validation before continuing...
- if not isinstance(uri, six.string_types):
+ if not isinstance(uri, str):
raise TypeError("Can only parse string types to uri data, "
"and not '%s' (%s)" % (uri, type(uri)))
match = _SCHEME_REGEX.match(uri)
@@ -236,7 +237,7 @@ def disallow_when_frozen(excp_cls):
def decorator(f):
- @six.wraps(f)
+ @functools.wraps(f)
def wrapper(self, *args, **kwargs):
if self.frozen:
raise excp_cls()
@@ -274,7 +275,7 @@ def binary_encode(text, encoding='utf-8', errors='strict'):
Does nothing if data is already a binary string (raises on unknown types).
"""
- if isinstance(text, six.binary_type):
+ if isinstance(text, bytes):
return text
else:
return encodeutils.safe_encode(text, encoding=encoding,
@@ -286,7 +287,7 @@ def binary_decode(data, encoding='utf-8', errors='strict'):
Does nothing if data is already a text string (raises on unknown types).
"""
- if isinstance(data, six.text_type):
+ if isinstance(data, str):
return data
else:
return encodeutils.safe_decode(data, incoming=encoding,
@@ -426,7 +427,7 @@ def get_version_string(obj):
if isinstance(obj_version, (list, tuple)):
obj_version = '.'.join(str(item) for item in obj_version)
if obj_version is not None and not isinstance(obj_version,
- six.string_types):
+ str):
obj_version = str(obj_version)
return obj_version
@@ -524,7 +525,7 @@ def is_iterable(obj):
:param obj: object to be tested for iterable
:return: True if object is iterable and is not a string
"""
- return (not isinstance(obj, six.string_types) and
+ return (not isinstance(obj, str) and
isinstance(obj, collections.abc.Iterable))
diff --git a/taskflow/utils/mixins.py b/taskflow/utils/mixins.py
deleted file mode 100644
index 5bb0fa4..0000000
--- a/taskflow/utils/mixins.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-
-
-class StrMixin(object):
- """Mixin that helps deal with the PY2 and PY3 method differences.
-
- http://lucumr.pocoo.org/2011/1/22/forwards-compatible-python/ explains
- why this is quite useful...
- """
-
- if six.PY2:
- def __str__(self):
- try:
- return self.__bytes__()
- except AttributeError:
- return self.__unicode__().encode('utf-8')
- else:
- def __str__(self):
- return self.__unicode__()
diff --git a/taskflow/utils/redis_utils.py b/taskflow/utils/redis_utils.py
index 0d04073..373ab2d 100644
--- a/taskflow/utils/redis_utils.py
+++ b/taskflow/utils/redis_utils.py
@@ -15,15 +15,15 @@
# under the License.
import enum
+import functools
import redis
from redis import exceptions as redis_exceptions
-import six
def _raise_on_closed(meth):
- @six.wraps(meth)
+ @functools.wraps(meth)
def wrapper(self, *args, **kwargs):
if self.closed:
raise redis_exceptions.ConnectionError("Connection has been"
diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py
index ed55468..74b4ebb 100644
--- a/taskflow/utils/threading_utils.py
+++ b/taskflow/utils/threading_utils.py
@@ -14,13 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import _thread
import collections
import multiprocessing
import threading
-import six
-from six.moves import _thread
-
from taskflow.utils import misc
@@ -106,7 +104,7 @@ class ThreadBundle(object):
before_join, after_join)
for attr_name in builder.fields:
cb = getattr(builder, attr_name)
- if not six.callable(cb):
+ if not callable(cb):
raise ValueError("Provided callback for argument"
" '%s' must be callable" % attr_name)
with self._lock:
diff --git a/tools/schema_generator.py b/tools/schema_generator.py
index 3685a0a..c84e24b 100755
--- a/tools/schema_generator.py
+++ b/tools/schema_generator.py
@@ -17,7 +17,6 @@
import contextlib
import re
-import six
import tabulate
from taskflow.persistence.backends import impl_sqlalchemy
@@ -37,9 +36,9 @@ SCHEMA_QUERY = "pragma table_info(%s)"
def to_bool_string(val):
if isinstance(val, (int, bool)):
- return six.text_type(bool(val))
- if not isinstance(val, six.string_types):
- val = six.text_type(val)
+ return str(bool(val))
+ if not isinstance(val, str):
+ val = str(val)
if val.lower() in ('0', 'false'):
return 'False'
if val.lower() in ('1', 'true'):
diff --git a/tools/speed_test.py b/tools/speed_test.py
index f9da37a..39092e6 100644
--- a/tools/speed_test.py
+++ b/tools/speed_test.py
@@ -18,11 +18,10 @@ Profile a simple engine build/load/compile/prepare/validate/run.
import argparse
import cProfile as profiler
+import io
import pstats
from oslo_utils import timeutils
-import six
-from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import linear_flow as lf
@@ -50,7 +49,7 @@ class ProfileIt(object):
def __exit__(self, exc_tp, exc_v, exc_tb):
self.profile.disable()
- buf = six.StringIO()
+ buf = io.StringIO()
ps = pstats.Stats(self.profile, stream=buf)
ps = ps.sort_stats(*self.stats_ordering)
percent_limit = max(0.0, max(1.0, self.args.limit / 100.0))
diff --git a/tox.ini b/tox.ini
index 752cd5e..85031e4 100644
--- a/tox.ini
+++ b/tox.ini
@@ -61,7 +61,6 @@ ignore = E305,E402,E721,E731,E741,W503,W504
[hacking]
import_exceptions =
- six.moves
taskflow.test.mock
unittest.mock