summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine')
-rw-r--r--taskflow/engines/action_engine/actions/base.py5
-rw-r--r--taskflow/engines/action_engine/compiler.py3
-rw-r--r--taskflow/engines/action_engine/completer.py4
-rw-r--r--taskflow/engines/action_engine/deciders.py9
-rw-r--r--taskflow/engines/action_engine/engine.py10
-rw-r--r--taskflow/engines/action_engine/executor.py4
-rw-r--r--taskflow/engines/action_engine/process_executor.py36
7 files changed, 20 insertions, 51 deletions
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py
index 3a014e1..e4be825 100644
--- a/taskflow/engines/action_engine/actions/base.py
+++ b/taskflow/engines/action_engine/actions/base.py
@@ -16,13 +16,10 @@
import abc
-import six
-
from taskflow import states
-@six.add_metaclass(abc.ABCMeta)
-class Action(object):
+class Action(object, metaclass=abc.ABCMeta):
"""An action that handles executing, state changes, ... of atoms."""
NO_RESULT = object()
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py
index c72506a..f1e0e54 100644
--- a/taskflow/engines/action_engine/compiler.py
+++ b/taskflow/engines/action_engine/compiler.py
@@ -18,7 +18,6 @@ import threading
import fasteners
from oslo_utils import excutils
-import six
from taskflow import flow
from taskflow import logging
@@ -165,7 +164,7 @@ class FlowCompiler(object):
decomposed = dict(
(child, self._deep_compiler_func(child, parent=tree_node)[0])
for child in flow)
- decomposed_graphs = list(six.itervalues(decomposed))
+ decomposed_graphs = list(decomposed.values())
graph = gr.merge_graphs(graph, *decomposed_graphs,
overlap_detector=_overlap_occurrence_detector)
for u, v, attr_dict in flow.iter_links():
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 59a2dbf..028b64a 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -19,7 +19,6 @@ import weakref
from oslo_utils import reflection
from oslo_utils import strutils
-import six
from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
@@ -30,8 +29,7 @@ from taskflow import states as st
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
-class Strategy(object):
+class Strategy(object, metaclass=abc.ABCMeta):
"""Failure resolution strategy base class."""
strategy = None
diff --git a/taskflow/engines/action_engine/deciders.py b/taskflow/engines/action_engine/deciders.py
index 4c0c04b..b8c399a 100644
--- a/taskflow/engines/action_engine/deciders.py
+++ b/taskflow/engines/action_engine/deciders.py
@@ -17,8 +17,6 @@
import abc
import itertools
-import six
-
from taskflow import deciders
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import traversal
@@ -28,8 +26,7 @@ from taskflow import states
LOG = logging.getLogger(__name__)
-@six.add_metaclass(abc.ABCMeta)
-class Decider(object):
+class Decider(object, metaclass=abc.ABCMeta):
"""Base class for deciders.
Provides interface to be implemented by sub-classes.
@@ -135,7 +132,7 @@ class IgnoreDecider(Decider):
states_intentions = runtime.storage.get_atoms_states(
ed.from_node.name for ed in self._edge_deciders
if ed.kind in compiler.ATOMS)
- for atom_name in six.iterkeys(states_intentions):
+ for atom_name in states_intentions.keys():
atom_state, _atom_intention = states_intentions[atom_name]
if atom_state != states.IGNORE:
history[atom_name] = runtime.storage.get(atom_name)
@@ -155,7 +152,7 @@ class IgnoreDecider(Decider):
LOG.trace("Out of %s deciders there were %s 'do no run it'"
" voters, %s 'do run it' voters and %s 'ignored'"
" voters for transition to atom '%s' given history %s",
- sum(len(eds) for eds in six.itervalues(voters)),
+ sum(len(eds) for eds in voters.values()),
list(ed.from_node.name
for ed in voters['do_not_run_it']),
list(ed.from_node.name for ed in voters['run_it']),
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index cf7c042..4895ac3 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -22,11 +22,11 @@ import threading
from automaton import runners
from concurrent import futures
import fasteners
+import functools
import networkx as nx
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import timeutils
-import six
from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler
@@ -65,7 +65,7 @@ def _pre_check(check_compiled=True, check_storage_ensured=True,
def decorator(meth):
do_what = meth.__name__
- @six.wraps(meth)
+ @functools.wraps(meth)
def wrapper(self, *args, **kwargs):
if check_compiled and not self._compiled:
raise exc.InvalidState("Can not %s an engine which"
@@ -335,8 +335,8 @@ class ActionEngine(base.Engine):
e_failures = self.storage.get_execute_failures()
r_failures = self.storage.get_revert_failures()
er_failures = itertools.chain(
- six.itervalues(e_failures),
- six.itervalues(r_failures))
+ e_failures.values(),
+ r_failures.values())
failure.Failure.reraise_if_any(er_failures)
finally:
if w is not None:
@@ -594,7 +594,7 @@ String (case insensitive) Executor used
executor_cls = cls._default_executor_cls
# Match the desired executor to a class that will work with it...
desired_executor = options.get('executor')
- if isinstance(desired_executor, six.string_types):
+ if isinstance(desired_executor, str):
matched_executor_cls = None
for m in cls._executor_str_matchers:
if m.matches(desired_executor):
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index 5dbaf58..d0c2ecf 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -17,7 +17,6 @@
import abc
import futurist
-import six
from taskflow import task as ta
from taskflow.types import failure
@@ -106,8 +105,7 @@ class SerialRetryExecutor(object):
return fut
-@six.add_metaclass(abc.ABCMeta)
-class TaskExecutor(object):
+class TaskExecutor(object, metaclass=abc.ABCMeta):
"""Executes and reverts tasks.
This class takes task and its arguments and executes or reverts it.
diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py
index 4115653..d20f7a3 100644
--- a/taskflow/engines/action_engine/process_executor.py
+++ b/taskflow/engines/action_engine/process_executor.py
@@ -30,7 +30,6 @@ import time
import futurist
from oslo_utils import excutils
-import six
from taskflow.engines.action_engine import executor as base
from taskflow import logging
@@ -80,19 +79,6 @@ SCHEMAS = {
},
}
-# See http://bugs.python.org/issue1457119 for why this is so complex...
-_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError]
-try:
- import cPickle
- _DECODE_ENCODE_ERRORS.append(cPickle.PickleError)
- del cPickle
-except (ImportError, AttributeError):
- pass
-_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS)
-
-# Use the best pickle from here on out...
-from six.moves import cPickle as pickle
-
class UnknownSender(Exception):
"""Exception raised when message from unknown sender is recvd."""
@@ -142,13 +128,13 @@ class Reader(object):
])
def __init__(self, auth_key, dispatch_func, msg_limit=-1):
- if not six.callable(dispatch_func):
+ if not callable(dispatch_func):
raise ValueError("Expected provided dispatch function"
" to be callable")
self.auth_key = auth_key
self.dispatch_func = dispatch_func
msg_limiter = iter_utils.iter_forever(msg_limit)
- self.msg_count = six.next(msg_limiter)
+ self.msg_count = next(msg_limiter)
self._msg_limiter = msg_limiter
self._buffer = misc.BytesIO()
self._state = None
@@ -200,7 +186,7 @@ class Reader(object):
# (instead of the receiver discarding it after the fact)...
functools.partial(_decode_message, self.auth_key, data,
self._memory['mac']))
- self.msg_count = six.next(self._msg_limiter)
+ self.msg_count = next(self._msg_limiter)
self._memory.clear()
def _transition(self):
@@ -267,7 +253,7 @@ def _create_random_string(desired_length):
def _calculate_hmac(auth_key, body):
mac = hmac.new(auth_key, body, hashlib.md5).hexdigest()
- if isinstance(mac, six.text_type):
+ if isinstance(mac, str):
mac = mac.encode("ascii")
return mac
@@ -427,11 +413,8 @@ class DispatcherHandler(asyncore.dispatcher):
CHUNK_SIZE = 8192
def __init__(self, sock, addr, dispatcher):
- if six.PY2:
- asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock)
- else:
- super(DispatcherHandler, self).__init__(map=dispatcher.map,
- sock=sock)
+ super(DispatcherHandler, self).__init__(map=dispatcher.map,
+ sock=sock)
self.blobs_to_write = list(dispatcher.challenge_pieces)
self.reader = Reader(dispatcher.auth_key, self._dispatch)
self.targets = dispatcher.targets
@@ -508,7 +491,7 @@ class DispatcherHandler(asyncore.dispatcher):
except (IOError, UnknownSender):
LOG.warning("Invalid received message", exc_info=True)
self.handle_close()
- except _DECODE_ENCODE_ERRORS:
+ except (pickle.PickleError, TypeError):
LOG.warning("Badly formatted message", exc_info=True)
self.handle_close()
except (ValueError, su.ValidationError):
@@ -526,10 +509,7 @@ class Dispatcher(asyncore.dispatcher):
MAX_BACKLOG = 5
def __init__(self, map, auth_key, identity):
- if six.PY2:
- asyncore.dispatcher.__init__(self, map=map)
- else:
- super(Dispatcher, self).__init__(map=map)
+ super(Dispatcher, self).__init__(map=map)
self.identity = identity
self.challenge_pieces = _encode_message(auth_key, CHALLENGE,
identity, reverse=True)