diff options
Diffstat (limited to 'taskflow/engines/action_engine')
-rw-r--r-- | taskflow/engines/action_engine/actions/base.py | 5 | ||||
-rw-r--r-- | taskflow/engines/action_engine/compiler.py | 3 | ||||
-rw-r--r-- | taskflow/engines/action_engine/completer.py | 4 | ||||
-rw-r--r-- | taskflow/engines/action_engine/deciders.py | 9 | ||||
-rw-r--r-- | taskflow/engines/action_engine/engine.py | 10 | ||||
-rw-r--r-- | taskflow/engines/action_engine/executor.py | 4 | ||||
-rw-r--r-- | taskflow/engines/action_engine/process_executor.py | 36 |
7 files changed, 20 insertions, 51 deletions
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 3a014e1..e4be825 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -16,13 +16,10 @@ import abc -import six - from taskflow import states -@six.add_metaclass(abc.ABCMeta) -class Action(object): +class Action(object, metaclass=abc.ABCMeta): """An action that handles executing, state changes, ... of atoms.""" NO_RESULT = object() diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index c72506a..f1e0e54 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -18,7 +18,6 @@ import threading import fasteners from oslo_utils import excutils -import six from taskflow import flow from taskflow import logging @@ -165,7 +164,7 @@ class FlowCompiler(object): decomposed = dict( (child, self._deep_compiler_func(child, parent=tree_node)[0]) for child in flow) - decomposed_graphs = list(six.itervalues(decomposed)) + decomposed_graphs = list(decomposed.values()) graph = gr.merge_graphs(graph, *decomposed_graphs, overlap_detector=_overlap_occurrence_detector) for u, v, attr_dict in flow.iter_links(): diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 59a2dbf..028b64a 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -19,7 +19,6 @@ import weakref from oslo_utils import reflection from oslo_utils import strutils -import six from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import executor as ex @@ -30,8 +29,7 @@ from taskflow import states as st LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class Strategy(object): +class Strategy(object, metaclass=abc.ABCMeta): """Failure resolution strategy base class.""" strategy = None diff --git a/taskflow/engines/action_engine/deciders.py b/taskflow/engines/action_engine/deciders.py index 4c0c04b..b8c399a 100644 --- a/taskflow/engines/action_engine/deciders.py +++ b/taskflow/engines/action_engine/deciders.py @@ -17,8 +17,6 @@ import abc import itertools -import six - from taskflow import deciders from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import traversal @@ -28,8 +26,7 @@ from taskflow import states LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) -class Decider(object): +class Decider(object, metaclass=abc.ABCMeta): """Base class for deciders. Provides interface to be implemented by sub-classes. @@ -135,7 +132,7 @@ class IgnoreDecider(Decider): states_intentions = runtime.storage.get_atoms_states( ed.from_node.name for ed in self._edge_deciders if ed.kind in compiler.ATOMS) - for atom_name in six.iterkeys(states_intentions): + for atom_name in states_intentions.keys(): atom_state, _atom_intention = states_intentions[atom_name] if atom_state != states.IGNORE: history[atom_name] = runtime.storage.get(atom_name) @@ -155,7 +152,7 @@ class IgnoreDecider(Decider): LOG.trace("Out of %s deciders there were %s 'do no run it'" " voters, %s 'do run it' voters and %s 'ignored'" " voters for transition to atom '%s' given history %s", - sum(len(eds) for eds in six.itervalues(voters)), + sum(len(eds) for eds in voters.values()), list(ed.from_node.name for ed in voters['do_not_run_it']), list(ed.from_node.name for ed in voters['run_it']), diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index cf7c042..4895ac3 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -22,11 +22,11 @@ import threading from automaton import runners from concurrent import futures import fasteners +import functools import networkx as nx from oslo_utils import excutils from oslo_utils import strutils from oslo_utils import timeutils -import six from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler @@ -65,7 +65,7 @@ def _pre_check(check_compiled=True, check_storage_ensured=True, def decorator(meth): do_what = meth.__name__ - @six.wraps(meth) + @functools.wraps(meth) def wrapper(self, *args, **kwargs): if check_compiled and not self._compiled: raise exc.InvalidState("Can not %s an engine which" @@ -335,8 +335,8 @@ class ActionEngine(base.Engine): e_failures = self.storage.get_execute_failures() r_failures = self.storage.get_revert_failures() er_failures = itertools.chain( - six.itervalues(e_failures), - six.itervalues(r_failures)) + e_failures.values(), + r_failures.values()) failure.Failure.reraise_if_any(er_failures) finally: if w is not None: @@ -594,7 +594,7 @@ String (case insensitive) Executor used executor_cls = cls._default_executor_cls # Match the desired executor to a class that will work with it... desired_executor = options.get('executor') - if isinstance(desired_executor, six.string_types): + if isinstance(desired_executor, str): matched_executor_cls = None for m in cls._executor_str_matchers: if m.matches(desired_executor): diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 5dbaf58..d0c2ecf 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -17,7 +17,6 @@ import abc import futurist -import six from taskflow import task as ta from taskflow.types import failure @@ -106,8 +105,7 @@ class SerialRetryExecutor(object): return fut -@six.add_metaclass(abc.ABCMeta) -class TaskExecutor(object): +class TaskExecutor(object, metaclass=abc.ABCMeta): """Executes and reverts tasks. This class takes task and its arguments and executes or reverts it. diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py index 4115653..d20f7a3 100644 --- a/taskflow/engines/action_engine/process_executor.py +++ b/taskflow/engines/action_engine/process_executor.py @@ -30,7 +30,6 @@ import time import futurist from oslo_utils import excutils -import six from taskflow.engines.action_engine import executor as base from taskflow import logging @@ -80,19 +79,6 @@ SCHEMAS = { }, } -# See http://bugs.python.org/issue1457119 for why this is so complex... -_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError] -try: - import cPickle - _DECODE_ENCODE_ERRORS.append(cPickle.PickleError) - del cPickle -except (ImportError, AttributeError): - pass -_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS) - -# Use the best pickle from here on out... -from six.moves import cPickle as pickle - class UnknownSender(Exception): """Exception raised when message from unknown sender is recvd.""" @@ -142,13 +128,13 @@ class Reader(object): ]) def __init__(self, auth_key, dispatch_func, msg_limit=-1): - if not six.callable(dispatch_func): + if not callable(dispatch_func): raise ValueError("Expected provided dispatch function" " to be callable") self.auth_key = auth_key self.dispatch_func = dispatch_func msg_limiter = iter_utils.iter_forever(msg_limit) - self.msg_count = six.next(msg_limiter) + self.msg_count = next(msg_limiter) self._msg_limiter = msg_limiter self._buffer = misc.BytesIO() self._state = None @@ -200,7 +186,7 @@ class Reader(object): # (instead of the receiver discarding it after the fact)... functools.partial(_decode_message, self.auth_key, data, self._memory['mac'])) - self.msg_count = six.next(self._msg_limiter) + self.msg_count = next(self._msg_limiter) self._memory.clear() def _transition(self): @@ -267,7 +253,7 @@ def _create_random_string(desired_length): def _calculate_hmac(auth_key, body): mac = hmac.new(auth_key, body, hashlib.md5).hexdigest() - if isinstance(mac, six.text_type): + if isinstance(mac, str): mac = mac.encode("ascii") return mac @@ -427,11 +413,8 @@ class DispatcherHandler(asyncore.dispatcher): CHUNK_SIZE = 8192 def __init__(self, sock, addr, dispatcher): - if six.PY2: - asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock) - else: - super(DispatcherHandler, self).__init__(map=dispatcher.map, - sock=sock) + super(DispatcherHandler, self).__init__(map=dispatcher.map, + sock=sock) self.blobs_to_write = list(dispatcher.challenge_pieces) self.reader = Reader(dispatcher.auth_key, self._dispatch) self.targets = dispatcher.targets @@ -508,7 +491,7 @@ class DispatcherHandler(asyncore.dispatcher): except (IOError, UnknownSender): LOG.warning("Invalid received message", exc_info=True) self.handle_close() - except _DECODE_ENCODE_ERRORS: + except (pickle.PickleError, TypeError): LOG.warning("Badly formatted message", exc_info=True) self.handle_close() except (ValueError, su.ValidationError): @@ -526,10 +509,7 @@ class Dispatcher(asyncore.dispatcher): MAX_BACKLOG = 5 def __init__(self, map, auth_key, identity): - if six.PY2: - asyncore.dispatcher.__init__(self, map=map) - else: - super(Dispatcher, self).__init__(map=map) + super(Dispatcher, self).__init__(map=map) self.identity = identity self.challenge_pieces = _encode_message(auth_key, CHALLENGE, identity, reverse=True) |