summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
authorJeremy Stanley <fungi@yuggoth.org>2014-03-31 12:51:17 +0000
committerJeremy Stanley <fungi@yuggoth.org>2014-03-31 12:59:31 +0000
commitc95bf4165d41241fdbbfefe62653450eb7e5e838 (patch)
treeb949126192d60a7ddb43e3d2db1779947779ea27 /taskflow
parentc2a4989ea88000dc3a532588a96bab23c233d5fb (diff)
downloadtaskflow-c95bf4165d41241fdbbfefe62653450eb7e5e838.tar.gz
Revert "Move taskflow.utils.misc.Failure to its own module"
This reverts commit 42ca240e8157b840c298d14fbf478ae570376633 which was a breaking change in a library consumed by other OpenStack projects with no deprecation or backwards compatibility considerations. It was able to merge because openstack/taskflow is apparently not yet part of the integrated gate via the proposed I202f4809afd689155e2cc4a00fc704fd772a0e92 change. Change-Id: I96cf36dc317499df91e43502efc85221f8177395 Closes-Bug: #1300161
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/engines/action_engine/engine.py3
-rw-r--r--taskflow/engines/action_engine/executor.py6
-rw-r--r--taskflow/engines/action_engine/graph_action.py6
-rw-r--r--taskflow/engines/action_engine/retry_action.py6
-rw-r--r--taskflow/engines/action_engine/task_action.py6
-rw-r--r--taskflow/engines/worker_based/executor.py7
-rw-r--r--taskflow/engines/worker_based/protocol.py11
-rw-r--r--taskflow/engines/worker_based/server.py27
-rw-r--r--taskflow/examples/wrapped_exception.py21
-rw-r--r--taskflow/failure.py231
-rw-r--r--taskflow/listeners/base.py3
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py6
-rw-r--r--taskflow/persistence/logbook.py26
-rw-r--r--taskflow/storage.py5
-rw-r--r--taskflow/test.py10
-rw-r--r--taskflow/tests/unit/persistence/base.py16
-rw-r--r--taskflow/tests/unit/test_action_engine.py4
-rw-r--r--taskflow/tests/unit/test_retries.py8
-rw-r--r--taskflow/tests/unit/test_storage.py40
-rw-r--r--taskflow/tests/unit/test_utils.py5
-rw-r--r--taskflow/tests/unit/test_utils_failure.py58
-rw-r--r--taskflow/tests/unit/worker_based/test_executor.py8
-rw-r--r--taskflow/tests/unit/worker_based/test_protocol.py14
-rw-r--r--taskflow/tests/unit/worker_based/test_server.py26
-rw-r--r--taskflow/tests/utils.py16
-rw-r--r--taskflow/utils/misc.py196
26 files changed, 368 insertions, 397 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 83d5d5f..2e43f84 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -24,7 +24,6 @@ from taskflow.engines.action_engine import task_action
from taskflow.engines import base
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow.openstack.common import excutils
from taskflow import retry
from taskflow import states
@@ -116,7 +115,7 @@ class ActionEngine(base.EngineBase):
self._change_state(state)
if state != states.SUSPENDED and state != states.SUCCESS:
failures = self.storage.get_failures()
- failure.Failure.reraise_if_any(failures.values())
+ misc.Failure.reraise_if_any(failures.values())
@lock_utils.locked(lock='_state_lock')
def _change_state(self, state):
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index 9837ad9..846cc56 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -19,8 +19,8 @@ import abc
from concurrent import futures
import six
-from taskflow import failure
from taskflow.utils import async_utils
+from taskflow.utils import misc
from taskflow.utils import threading_utils
# Execution and reversion events.
@@ -35,7 +35,7 @@ def _execute_task(task, arguments, progress_callback):
except Exception:
# NOTE(imelnikov): wrap current exception with Failure
# object and return it.
- result = failure.Failure()
+ result = misc.Failure()
return (task, EXECUTED, result)
@@ -49,7 +49,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
except Exception:
# NOTE(imelnikov): wrap current exception with Failure
# object and return it.
- result = failure.Failure()
+ result = misc.Failure()
return (task, REVERTED, result)
diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py
index c7d54b8..ae7f901 100644
--- a/taskflow/engines/action_engine/graph_action.py
+++ b/taskflow/engines/action_engine/graph_action.py
@@ -16,10 +16,10 @@
from taskflow.engines.action_engine import executor as ex
from taskflow import exceptions as excp
-from taskflow import failure
from taskflow import retry as r
from taskflow import states as st
from taskflow import task
+from taskflow.utils import misc
_WAITING_TIMEOUT = 60 # in seconds
@@ -77,7 +77,7 @@ class FutureGraphAction(object):
node, event, result = future.result()
if isinstance(node, task.BaseTask):
self._complete_task(node, event, result)
- if isinstance(result, failure.Failure):
+ if isinstance(result, misc.Failure):
if event == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
@@ -88,7 +88,7 @@ class FutureGraphAction(object):
not_done.extend(self._schedule(next_nodes))
if failures:
- failure.Failure.reraise_if_any(failures)
+ misc.Failure.reraise_if_any(failures)
if self._analyzer.get_next_nodes():
return st.SUSPENDED
diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py
index c6ca85b..a860f69 100644
--- a/taskflow/engines/action_engine/retry_action.py
+++ b/taskflow/engines/action_engine/retry_action.py
@@ -18,9 +18,9 @@ import logging
from taskflow.engines.action_engine import executor as ex
from taskflow import exceptions
-from taskflow import failure
from taskflow import states
from taskflow.utils import async_utils
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -63,7 +63,7 @@ class RetryAction(object):
try:
result = retry.execute(**kwargs)
except Exception:
- result = failure.Failure()
+ result = misc.Failure()
self.change_state(retry, states.FAILURE, result=result)
else:
self.change_state(retry, states.SUCCESS, result=result)
@@ -79,7 +79,7 @@ class RetryAction(object):
try:
result = retry.revert(**kwargs)
except Exception:
- result = failure.Failure()
+ result = misc.Failure()
self.change_state(retry, states.FAILURE)
else:
self.change_state(retry, states.REVERTED)
diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py
index ac2c9c1..32c0a17 100644
--- a/taskflow/engines/action_engine/task_action.py
+++ b/taskflow/engines/action_engine/task_action.py
@@ -17,8 +17,8 @@
import logging
from taskflow import exceptions
-from taskflow import failure
from taskflow import states
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -71,7 +71,7 @@ class TaskAction(object):
self._on_update_progress)
def complete_execution(self, task, result):
- if isinstance(result, failure.Failure):
+ if isinstance(result, misc.Failure):
self.change_state(task, states.FAILURE, result=result)
else:
self.change_state(task, states.SUCCESS,
@@ -91,7 +91,7 @@ class TaskAction(object):
return future
def complete_reversion(self, task, rev_result):
- if isinstance(rev_result, failure.Failure):
+ if isinstance(rev_result, misc.Failure):
self.change_state(task, states.FAILURE)
else:
self.change_state(task, states.REVERTED, progress=1.0)
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index c6b259b..4c4fc06 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -24,7 +24,6 @@ from taskflow.engines.worker_based import cache
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow.utils import async_utils
from taskflow.utils import misc
@@ -126,7 +125,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
"""
LOG.debug("Request '%r' has expired.", request)
LOG.debug("The '%r' request has expired.", request)
- request.set_result(failure.Failure.from_exception(
+ request.set_result(misc.Failure.from_exception(
exc.RequestTimeout("The '%r' request has expired" % request)))
def _on_wait(self):
@@ -162,11 +161,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
reply_to=self._uuid,
correlation_id=request.uuid)
except Exception:
- with failure.capture_failure() as fail:
+ with misc.capture_failure() as failure:
LOG.exception("Failed to submit the '%s' request." %
request)
self._requests_cache.delete(request.uuid)
- request.set_result(fail)
+ request.set_result(failure)
def _notify_topics(self):
"""Cyclically publish notify message to each topic."""
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py
index b8f8939..7085905 100644
--- a/taskflow/engines/worker_based/protocol.py
+++ b/taskflow/engines/worker_based/protocol.py
@@ -21,7 +21,6 @@ import six
from concurrent import futures
from taskflow.engines.action_engine import executor
-from taskflow import failure
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -137,7 +136,7 @@ class Request(Message):
return False
def to_dict(self):
- """Return json-serializable request, converting all `failure.Failure`
+ """Return json-serializable request, converting all `misc.Failure`
objects into dictionaries.
"""
request = dict(task_cls=self._task_cls, task_name=self._task.name,
@@ -145,15 +144,15 @@ class Request(Message):
arguments=self._arguments)
if 'result' in self._kwargs:
result = self._kwargs['result']
- if isinstance(result, failure.Failure):
+ if isinstance(result, misc.Failure):
request['result'] = ('failure', result.to_dict())
else:
request['result'] = ('success', result)
if 'failures' in self._kwargs:
failures = self._kwargs['failures']
request['failures'] = {}
- for task, fail in six.iteritems(failures):
- request['failures'][task] = fail.to_dict()
+ for task, failure in six.iteritems(failures):
+ request['failures'][task] = failure.to_dict()
return request
def set_result(self, result):
@@ -183,7 +182,7 @@ class Response(Message):
state = data['state']
data = data['data']
if state == FAILURE and 'result' in data:
- data['result'] = failure.Failure.from_dict(data['result'])
+ data['result'] = misc.Failure.from_dict(data['result'])
return cls(state, **data)
@property
diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py
index b026c0c..02f5664 100644
--- a/taskflow/engines/worker_based/server.py
+++ b/taskflow/engines/worker_based/server.py
@@ -21,7 +21,7 @@ from kombu import exceptions as kombu_exc
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
-from taskflow import failure
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -77,22 +77,21 @@ class Server(object):
@staticmethod
def _parse_request(task_cls, task_name, action, arguments, result=None,
failures=None, **kwargs):
- """Parse request before it can be processed.
-
- All `failure.Failure` objects that have been converted to dict on the
- remote side to be serializable are now converted back to objects.
+ """Parse request before it can be processed. All `misc.Failure` objects
+ that have been converted to dict on the remote side to be serializable
+ are now converted back to objects.
"""
action_args = dict(arguments=arguments, task_name=task_name)
if result is not None:
data_type, data = result
if data_type == 'failure':
- action_args['result'] = failure.Failure.from_dict(data)
+ action_args['result'] = misc.Failure.from_dict(data)
else:
action_args['result'] = data
if failures is not None:
action_args['failures'] = {}
for k, v in failures.items():
- action_args['failures'][k] = failure.Failure.from_dict(v)
+ action_args['failures'][k] = misc.Failure.from_dict(v)
return task_cls, action, action_args
@staticmethod
@@ -162,19 +161,19 @@ class Server(object):
action_args.update(task_uuid=task_uuid,
progress_callback=progress_callback)
except ValueError:
- with failure.capture_failure() as fail:
+ with misc.capture_failure() as failure:
LOG.exception("Failed to parse request")
- reply_callback(result=fail.to_dict())
+ reply_callback(result=failure.to_dict())
return
# get task endpoint
try:
endpoint = self._endpoints[task_cls]
except KeyError:
- with failure.capture_failure() as fail:
+ with misc.capture_failure() as failure:
LOG.exception("The '%s' task endpoint does not exist",
task_cls)
- reply_callback(result=fail.to_dict())
+ reply_callback(result=failure.to_dict())
return
else:
reply_callback(state=pr.RUNNING)
@@ -183,11 +182,11 @@ class Server(object):
try:
result = getattr(endpoint, action)(**action_args)
except Exception:
- with failure.capture_failure() as fail:
+ with misc.capture_failure() as failure:
LOG.exception("The %s task execution failed", endpoint)
- reply_callback(result=fail.to_dict())
+ reply_callback(result=failure.to_dict())
else:
- if isinstance(result, failure.Failure):
+ if isinstance(result, misc.Failure):
reply_callback(result=result.to_dict())
else:
reply_callback(state=pr.SUCCESS, result=result)
diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py
index 843bd11..17ae632 100644
--- a/taskflow/examples/wrapped_exception.py
+++ b/taskflow/examples/wrapped_exception.py
@@ -31,9 +31,10 @@ sys.path.insert(0, top_dir)
import taskflow.engines
from taskflow import exceptions
-from taskflow import failure
from taskflow.patterns import unordered_flow as uf
from taskflow import task
+from taskflow.tests import utils
+from taskflow.utils import misc
# INTRO: In this example we create two tasks which can trigger exceptions
# based on various inputs to show how to analyze the thrown exceptions for
@@ -95,20 +96,20 @@ def run(**store):
SecondTask()
)
try:
- with failure.wrap_all_failures():
+ with utils.wrap_all_failures():
taskflow.engines.run(flow, store=store,
engine_conf='parallel')
except exceptions.WrappedFailure as ex:
unknown_failures = []
- for fail in ex:
- if fail.check(FirstException):
- print("Got FirstException: %s" % fail.exception_str)
- elif fail.check(SecondException):
- print("Got SecondException: %s" % fail.exception_str)
+ for failure in ex:
+ if failure.check(FirstException):
+ print("Got FirstException: %s" % failure.exception_str)
+ elif failure.check(SecondException):
+ print("Got SecondException: %s" % failure.exception_str)
else:
- print("Unknown failure: %s" % fail)
- unknown_failures.append(fail)
- failure.Failure.reraise_if_any(unknown_failures)
+ print("Unknown failure: %s" % failure)
+ unknown_failures.append(failure)
+ misc.Failure.reraise_if_any(unknown_failures)
print_wrapped("Raise and catch first exception only")
diff --git a/taskflow/failure.py b/taskflow/failure.py
deleted file mode 100644
index 5dd825b..0000000
--- a/taskflow/failure.py
+++ /dev/null
@@ -1,231 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2013-2014 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import contextlib
-import sys
-import traceback
-
-import six
-
-from taskflow import exceptions as exc
-from taskflow.utils import misc
-from taskflow.utils import reflection
-
-
-@contextlib.contextmanager
-def wrap_all_failures():
- """Convert any exceptions to WrappedFailure.
-
- When you expect several failures, it may be convenient
- to wrap any exception with WrappedFailure in order to
- unify error handling.
- """
- try:
- yield
- except Exception:
- raise exc.WrappedFailure([Failure()])
-
-
-@contextlib.contextmanager
-def capture_failure():
- """Save current exception, and yield back the failure (or raises a
- runtime error if no active exception is being handled).
-
- In some cases the exception context can be cleared, resulting in None
- being attempted to be saved after an exception handler is run. This
- can happen when eventlet switches greenthreads or when running an
- exception handler, code raises and catches an exception. In both
- cases the exception context will be cleared.
-
- To work around this, we save the exception state, yield a failure and
- then run other code.
-
- For example::
-
- except Exception:
- with capture_failure() as fail:
- LOG.warn("Activating cleanup")
- cleanup()
- save_failure(fail)
- """
- exc_info = sys.exc_info()
- if not any(exc_info):
- raise RuntimeError("No active exception is being handled")
- else:
- yield Failure(exc_info=exc_info)
-
-
-class Failure(object):
- """Object that represents failure.
-
- Failure objects encapsulate exception information so that
- it can be re-used later to re-raise or inspect.
- """
- DICT_VERSION = 1
-
- def __init__(self, exc_info=None, **kwargs):
- if not kwargs:
- if exc_info is None:
- exc_info = sys.exc_info()
- self._exc_info = exc_info
- self._exc_type_names = list(
- reflection.get_all_class_names(exc_info[0], up_to=Exception))
- if not self._exc_type_names:
- raise TypeError('Invalid exception type: %r' % exc_info[0])
- self._exception_str = exc.exception_message(self._exc_info[1])
- self._traceback_str = ''.join(
- traceback.format_tb(self._exc_info[2]))
- else:
- self._exc_info = exc_info # may be None
- self._exception_str = kwargs.pop('exception_str')
- self._exc_type_names = kwargs.pop('exc_type_names', [])
- self._traceback_str = kwargs.pop('traceback_str', None)
- if kwargs:
- raise TypeError(
- 'Failure.__init__ got unexpected keyword argument(s): %s'
- % ', '.join(six.iterkeys(kwargs)))
-
- @classmethod
- def from_exception(cls, exception):
- return cls((type(exception), exception, None))
-
- def _matches(self, other):
- if self is other:
- return True
- return (self._exc_type_names == other._exc_type_names
- and self.exception_str == other.exception_str
- and self.traceback_str == other.traceback_str)
-
- def matches(self, other):
- if not isinstance(other, Failure):
- return False
- if self.exc_info is None or other.exc_info is None:
- return self._matches(other)
- else:
- return self == other
-
- def __eq__(self, other):
- if not isinstance(other, Failure):
- return NotImplemented
- return (self._matches(other) and
- misc.are_equal_exc_info_tuples(self.exc_info, other.exc_info))
-
- def __ne__(self, other):
- return not (self == other)
-
- # NOTE(imelnikov): obj.__hash__() should return same values for equal
- # objects, so we should redefine __hash__. Failure equality semantics
- # is a bit complicated, so for now we just mark Failure objects as
- # unhashable. See python docs on object.__hash__ for more info:
- # http://docs.python.org/2/reference/datamodel.html#object.__hash__
- __hash__ = None
-
- @property
- def exception(self):
- """Exception value, or None if exception value is not present.
-
- Exception value may be lost during serialization.
- """
- if self._exc_info:
- return self._exc_info[1]
- else:
- return None
-
- @property
- def exception_str(self):
- """String representation of exception."""
- return self._exception_str
-
- @property
- def exc_info(self):
- """Exception info tuple or None."""
- return self._exc_info
-
- @property
- def traceback_str(self):
- """Exception traceback as string."""
- return self._traceback_str
-
- @staticmethod
- def reraise_if_any(failures):
- """Re-raise exceptions if argument is not empty.
-
- If argument is empty list, this method returns None. If
- argument is list with single Failure object in it,
- this failure is reraised. Else, WrappedFailure exception
- is raised with failures list as causes.
- """
- failures = list(failures)
- if len(failures) == 1:
- failures[0].reraise()
- elif len(failures) > 1:
- raise exc.WrappedFailure(failures)
-
- def reraise(self):
- """Re-raise captured exception."""
- if self._exc_info:
- six.reraise(*self._exc_info)
- else:
- raise exc.WrappedFailure([self])
-
- def check(self, *exc_classes):
- """Check if any of exc_classes caused the failure.
-
- Arguments of this method can be exception types or type
- names (stings). If captured exception is instance of
- exception of given type, the corresponding argument is
- returned. Else, None is returned.
- """
- for cls in exc_classes:
- if isinstance(cls, type):
- err = reflection.get_class_name(cls)
- else:
- err = cls
- if err in self._exc_type_names:
- return cls
- return None
-
- def __str__(self):
- return 'Failure: %s: %s' % (self._exc_type_names[0],
- self._exception_str)
-
- def __iter__(self):
- """Iterate over exception type names."""
- for et in self._exc_type_names:
- yield et
-
- @classmethod
- def from_dict(cls, data):
- data = dict(data)
- version = data.pop('version', None)
- if version != cls.DICT_VERSION:
- raise ValueError('Invalid dict version of failure object: %r'
- % version)
- return cls(**data)
-
- def to_dict(self):
- return {
- 'exception_str': self.exception_str,
- 'traceback_str': self.traceback_str,
- 'exc_type_names': list(self),
- 'version': self.DICT_VERSION,
- }
-
- def copy(self):
- return Failure(exc_info=misc.copy_exc_info(self.exc_info),
- exception_str=self.exception_str,
- traceback_str=self.traceback_str,
- exc_type_names=self._exc_type_names[:])
diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py
index 9c601d4..e8f1674 100644
--- a/taskflow/listeners/base.py
+++ b/taskflow/listeners/base.py
@@ -21,7 +21,6 @@ import logging
import six
-from taskflow import failure
from taskflow.openstack.common import excutils
from taskflow import states
from taskflow.utils import misc
@@ -142,7 +141,7 @@ class LoggingBase(ListenerBase):
result = details.get('result')
exc_info = None
was_failure = False
- if isinstance(result, failure.Failure):
+ if isinstance(result, misc.Failure):
if result.exc_info:
exc_info = tuple(result.exc_info)
was_failure = True
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index 248c545..2067ff5 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -31,7 +31,6 @@ from sqlalchemy import orm as sa_orm
from sqlalchemy import pool as sa_pool
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow.persistence.backends import base
from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models
@@ -268,7 +267,7 @@ class Connection(base.Connection):
pass
except sa_exc.OperationalError as ex:
if _is_db_connection_error(six.text_type(ex.args[0])):
- failures.append(failure.Failure())
+ failures.append(misc.Failure())
return False
return True
@@ -514,7 +513,7 @@ def _convert_ad_to_external(ad):
# to change the internal sqlalchemy model easily by forcing a defined
# interface (that isn't the sqlalchemy model itself).
atom_cls = logbook.atom_detail_class(ad.atom_type)
- result = atom_cls.from_dict({
+ return atom_cls.from_dict({
'state': ad.state,
'intention': ad.intention,
'results': ad.results,
@@ -524,7 +523,6 @@ def _convert_ad_to_external(ad):
'name': ad.name,
'uuid': ad.uuid,
})
- return result
def _convert_lb_to_external(lb_m):
diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py
index 4dece26..f378b67 100644
--- a/taskflow/persistence/logbook.py
+++ b/taskflow/persistence/logbook.py
@@ -21,10 +21,10 @@ import logging
import six
-from taskflow import failure
from taskflow.openstack.common import timeutils
from taskflow.openstack.common import uuidutils
from taskflow import states
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -311,11 +311,11 @@ class AtomDetail(object):
def _to_dict_shared(self):
if self.failure:
- fail = self.failure.to_dict()
+ failure = self.failure.to_dict()
else:
- fail = None
+ failure = None
return {
- 'failure': fail,
+ 'failure': failure,
'meta': self.meta,
'name': self.name,
'results': self.results,
@@ -331,9 +331,9 @@ class AtomDetail(object):
self.results = data.get('results')
self.version = data.get('version')
self.meta = _fix_meta(data)
- fail = data.get('failure')
- if fail:
- self.failure = failure.Failure.from_dict(fail)
+ failure = data.get('failure')
+ if failure:
+ self.failure = misc.Failure.from_dict(failure)
@property
def uuid(self):
@@ -405,7 +405,7 @@ class RetryDetail(AtomDetail):
for (data, failures) in results:
new_failures = {}
for (key, failure_data) in six.iteritems(failures):
- new_failures[key] = failure.Failure.from_dict(failure_data)
+ new_failures[key] = misc.Failure.from_dict(failure_data)
new_results.append((data, new_failures))
return new_results
@@ -423,8 +423,8 @@ class RetryDetail(AtomDetail):
new_results = []
for (data, failures) in results:
new_failures = {}
- for (key, fail) in six.iteritems(failures):
- new_failures[key] = fail.to_dict()
+ for (key, failure) in six.iteritems(failures):
+ new_failures[key] = failure.to_dict()
new_results.append((data, new_failures))
return new_results
@@ -443,11 +443,11 @@ class RetryDetail(AtomDetail):
# contain tracebacks, which are not copyable.
for (data, failures) in other.results:
copied_failures = {}
- for (key, fail) in six.iteritems(failures):
+ for (key, failure) in six.iteritems(failures):
if deep_copy:
- copied_failures[key] = fail.copy()
+ copied_failures[key] = failure.copy()
else:
- copied_failures[key] = fail
+ copied_failures[key] = failure
results.append((data, copied_failures))
self.results = results
return self
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 23c90d5..dca870f 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -21,7 +21,6 @@ import logging
import six
from taskflow import exceptions
-from taskflow import failure
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
from taskflow import states
@@ -54,7 +53,7 @@ class Storage(object):
self._lock = self._lock_cls()
# NOTE(imelnikov): failure serialization looses information,
- # so we cache failures here, in task name -> failure.Failure mapping.
+ # so we cache failures here, in task name -> misc.Failure mapping.
self._failures = {}
for ad in self._flowdetail:
if ad.failure is not None:
@@ -326,7 +325,7 @@ class Storage(object):
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
ad.state = state
- if state == states.FAILURE and isinstance(data, failure.Failure):
+ if state == states.FAILURE and isinstance(data, misc.Failure):
# FIXME(harlowja): this seems like it should be internal logic
# in the atom detail object and not in here. Fix that soon...
#
diff --git a/taskflow/test.py b/taskflow/test.py
index 0ecbce4..ce99a37 100644
--- a/taskflow/test.py
+++ b/taskflow/test.py
@@ -24,7 +24,7 @@ import fixtures
import six
from taskflow import exceptions
-from taskflow import failure
+from taskflow.tests import utils
from taskflow.utils import misc
@@ -49,13 +49,13 @@ class FailureRegexpMatcher(object):
self.exc_class = exc_class
self.pattern = pattern
- def match(self, fail):
- for cause in fail:
+ def match(self, failure):
+ for cause in failure:
if cause.check(self.exc_class) is not None:
return matchers.MatchesRegex(
self.pattern).match(cause.exception_str)
return matchers.Mismatch("The `%s` wasn't caused by the `%s`" %
- (fail, self.exc_class))
+ (failure, self.exc_class))
class ItemsEqual(object):
@@ -171,7 +171,7 @@ class TestCase(testcase.TestCase):
string matches to the given pattern.
"""
try:
- with failure.wrap_all_failures():
+ with utils.wrap_all_failures():
callable_obj(*args, **kwargs)
except exceptions.WrappedFailure as e:
self.assertThat(e, FailureRegexpMatcher(exc_class, pattern))
diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py
index ca5a2db..3d28695 100644
--- a/taskflow/tests/unit/persistence/base.py
+++ b/taskflow/tests/unit/persistence/base.py
@@ -17,10 +17,10 @@
import contextlib
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
from taskflow import states
+from taskflow.utils import misc
class PersistenceTestMixin(object):
@@ -147,7 +147,7 @@ class PersistenceTestMixin(object):
try:
raise RuntimeError('Woot!')
except Exception:
- td.failure = failure.Failure()
+ td.failure = misc.Failure()
fd.add(td)
@@ -161,10 +161,10 @@ class PersistenceTestMixin(object):
lb2 = conn.get_logbook(lb_id)
fd2 = lb2.find(fd.uuid)
td2 = fd2.find(td.uuid)
- fail = td2.failure
- self.assertEqual(fail.exception_str, 'Woot!')
- self.assertIs(fail.check(RuntimeError), RuntimeError)
- self.assertEqual(fail.traceback_str, td.failure.traceback_str)
+ failure = td2.failure
+ self.assertEqual(failure.exception_str, 'Woot!')
+ self.assertIs(failure.check(RuntimeError), RuntimeError)
+ self.assertEqual(failure.traceback_str, td.failure.traceback_str)
self.assertIsInstance(td2, logbook.TaskDetail)
def test_logbook_merge_flow_detail(self):
@@ -269,7 +269,7 @@ class PersistenceTestMixin(object):
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd)
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
- fail = failure.Failure.from_exception(RuntimeError('fail'))
+ fail = misc.Failure.from_exception(RuntimeError('fail'))
rd.results.append((42, {'some-task': fail}))
fd.add(rd)
@@ -286,7 +286,7 @@ class PersistenceTestMixin(object):
rd2 = fd2.find(rd.uuid)
self.assertIsInstance(rd2, logbook.RetryDetail)
fail2 = rd2.results[0][1].get('some-task')
- self.assertIsInstance(fail2, failure.Failure)
+ self.assertIsInstance(fail2, misc.Failure)
self.assertTrue(fail.matches(fail2))
def test_retry_detail_save_intention(self):
diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py
index 26bbcd9..d2401f4 100644
--- a/taskflow/tests/unit/test_action_engine.py
+++ b/taskflow/tests/unit/test_action_engine.py
@@ -31,7 +31,6 @@ from taskflow.engines.action_engine import engine as eng
from taskflow.engines.worker_based import engine as w_eng
from taskflow.engines.worker_based import worker as wkr
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow.persistence import logbook
from taskflow import states
from taskflow import task
@@ -39,6 +38,7 @@ from taskflow import test
from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu
+from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@@ -488,7 +488,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
self.assertEqual(result, 'RESULT')
self.assertEqual(list(flow_failures.keys()), ['fail1'])
fail = flow_failures['fail1']
- self.assertIsInstance(fail, failure.Failure)
+ self.assertIsInstance(fail, misc.Failure)
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
flow = lf.Flow('test').add(
diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py
index 44874f1..42f4a5f 100644
--- a/taskflow/tests/unit/test_retries.py
+++ b/taskflow/tests/unit/test_retries.py
@@ -21,11 +21,11 @@ from taskflow.patterns import unordered_flow as uf
import taskflow.engines
from taskflow import exceptions as exc
-from taskflow import failure
from taskflow import retry
from taskflow import states as st
from taskflow import test
from taskflow.tests import utils
+from taskflow.utils import misc
class RetryTest(utils.EngineTestBase):
@@ -558,7 +558,7 @@ class RetryTest(utils.EngineTestBase):
# we execute retry
engine.storage.save('flow-1_retry', 1)
# task fails
- fail = failure.Failure.from_exception(RuntimeError('foo')),
+ fail = misc.Failure.from_exception(RuntimeError('foo')),
engine.storage.save('task1', fail, state=st.FAILURE)
if when == 'task fails':
return engine
@@ -634,7 +634,7 @@ class RetryTest(utils.EngineTestBase):
self._make_engine(flow).run)
self.assertEqual(len(r.history), 1)
self.assertEqual(r.history[0][1], {})
- self.assertEqual(isinstance(r.history[0][0], failure.Failure), True)
+ self.assertEqual(isinstance(r.history[0][0], misc.Failure), True)
def test_retry_revert_fails(self):
@@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase):
engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11)
# pretend that 'c' failed
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ fail = misc.Failure.from_exception(RuntimeError('Woot!'))
engine.storage.save('c', fail, st.FAILURE)
engine.run()
diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py
index 2c6fcd2..eb08819 100644
--- a/taskflow/tests/unit/test_storage.py
+++ b/taskflow/tests/unit/test_storage.py
@@ -20,13 +20,13 @@ import threading
import mock
from taskflow import exceptions
-from taskflow import failure
from taskflow.openstack.common import uuidutils
from taskflow.persistence import backends
from taskflow.persistence import logbook
from taskflow import states
from taskflow import storage
from taskflow import test
+from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@@ -127,46 +127,46 @@ class StorageTestMixin(object):
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
def test_save_and_get_cached_failure(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
- s.save('my task', fail, states.FAILURE)
- self.assertEqual(s.get('my task'), fail)
+ s.save('my task', failure, states.FAILURE)
+ self.assertEqual(s.get('my task'), failure)
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
self.assertTrue(s.has_failures())
- self.assertEqual(s.get_failures(), {'my task': fail})
+ self.assertEqual(s.get_failures(), {'my task': failure})
def test_save_and_get_non_cached_failure(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
- s.save('my task', fail, states.FAILURE)
- self.assertEqual(s.get('my task'), fail)
+ s.save('my task', failure, states.FAILURE)
+ self.assertEqual(s.get('my task'), failure)
s._failures['my task'] = None
- self.assertTrue(fail.matches(s.get('my task')))
+ self.assertTrue(failure.matches(s.get('my task')))
def test_get_failure_from_reverted_task(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
- s.save('my task', fail, states.FAILURE)
+ s.save('my task', failure, states.FAILURE)
s.set_atom_state('my task', states.REVERTING)
- self.assertEqual(s.get('my task'), fail)
+ self.assertEqual(s.get('my task'), failure)
s.set_atom_state('my task', states.REVERTED)
- self.assertEqual(s.get('my task'), fail)
+ self.assertEqual(s.get('my task'), failure)
def test_get_failure_after_reload(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
- s.save('my task', fail, states.FAILURE)
+ s.save('my task', failure, states.FAILURE)
s2 = self._get_storage(s._flowdetail)
self.assertTrue(s2.has_failures())
self.assertEqual(1, len(s2.get_failures()))
- self.assertTrue(fail.matches(s2.get('my task')))
+ self.assertTrue(failure.matches(s2.get('my task')))
self.assertEqual(s2.get_atom_state('my task'), states.FAILURE)
def test_get_non_existing_var(self):
@@ -483,15 +483,15 @@ class StorageTestMixin(object):
self.assertEqual(s.fetch_all(), {})
def test_cached_retry_failure(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_retry('my retry', result_mapping={'x': 0})
s.save('my retry', 'a')
- s.save('my retry', fail, states.FAILURE)
+ s.save('my retry', failure, states.FAILURE)
history = s.get_retry_history('my retry')
- self.assertEqual(history, [('a', {}), (fail, {})])
+ self.assertEqual(history, [('a', {}), (failure, {})])
self.assertIs(s.has_failures(), True)
- self.assertEqual(s.get_failures(), {'my retry': fail})
+ self.assertEqual(s.get_failures(), {'my retry': failure})
def test_logbook_get_unknown_atom_type(self):
self.assertRaisesRegexp(TypeError,
diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py
index 5923dc4..1c5c197 100644
--- a/taskflow/tests/unit/test_utils.py
+++ b/taskflow/tests/unit/test_utils.py
@@ -19,7 +19,6 @@ import functools
import sys
import time
-from taskflow import failure
from taskflow import states
from taskflow import test
from taskflow.tests import utils as test_utils
@@ -285,8 +284,8 @@ class GetClassNameTest(test.TestCase):
self.assertEqual(name, 'RuntimeError')
def test_global_class(self):
- name = reflection.get_class_name(failure.Failure)
- self.assertEqual(name, 'taskflow.failure.Failure')
+ name = reflection.get_class_name(misc.Failure)
+ self.assertEqual(name, 'taskflow.utils.misc.Failure')
def test_class(self):
name = reflection.get_class_name(Class)
diff --git a/taskflow/tests/unit/test_utils_failure.py b/taskflow/tests/unit/test_utils_failure.py
index be42129..394abfd 100644
--- a/taskflow/tests/unit/test_utils_failure.py
+++ b/taskflow/tests/unit/test_utils_failure.py
@@ -20,14 +20,14 @@ from taskflow import exceptions
from taskflow import test
from taskflow.tests import utils as test_utils
-from taskflow import failure
+from taskflow.utils import misc
def _captured_failure(msg):
try:
raise RuntimeError(msg)
except Exception:
- return failure.Failure()
+ return misc.Failure()
class GeneralFailureObjTestsMixin(object):
@@ -82,9 +82,9 @@ class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin):
def setUp(self):
super(ReCreatedFailureTestCase, self).setUp()
fail_obj = _captured_failure('Woot!')
- self.fail_obj = failure.Failure(exception_str=fail_obj.exception_str,
- traceback_str=fail_obj.traceback_str,
- exc_type_names=list(fail_obj))
+ self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str,
+ traceback_str=fail_obj.traceback_str,
+ exc_type_names=list(fail_obj))
def test_value_lost(self):
self.assertIs(self.fail_obj.exception, None)
@@ -102,7 +102,7 @@ class FromExceptionTestCase(test.TestCase, GeneralFailureObjTestsMixin):
def setUp(self):
super(FromExceptionTestCase, self).setUp()
- self.fail_obj = failure.Failure.from_exception(RuntimeError('Woot!'))
+ self.fail_obj = misc.Failure.from_exception(RuntimeError('Woot!'))
class FailureObjectTestCase(test.TestCase):
@@ -111,10 +111,10 @@ class FailureObjectTestCase(test.TestCase):
try:
raise SystemExit()
except BaseException:
- self.assertRaises(TypeError, failure.Failure)
+ self.assertRaises(TypeError, misc.Failure)
def test_unknown_argument(self):
- exc = self.assertRaises(TypeError, failure.Failure,
+ exc = self.assertRaises(TypeError, misc.Failure,
exception_str='Woot!',
traceback_str=None,
exc_type_names=['Exception'],
@@ -123,12 +123,12 @@ class FailureObjectTestCase(test.TestCase):
self.assertEqual(str(exc), expected)
def test_empty_does_not_reraise(self):
- self.assertIs(failure.Failure.reraise_if_any([]), None)
+ self.assertIs(misc.Failure.reraise_if_any([]), None)
def test_reraises_one(self):
fls = [_captured_failure('Woot!')]
self.assertRaisesRegexp(RuntimeError, '^Woot!$',
- failure.Failure.reraise_if_any, fls)
+ misc.Failure.reraise_if_any, fls)
def test_reraises_several(self):
fls = [
@@ -136,7 +136,7 @@ class FailureObjectTestCase(test.TestCase):
_captured_failure('Oh, not again!')
]
exc = self.assertRaises(exceptions.WrappedFailure,
- failure.Failure.reraise_if_any, fls)
+ misc.Failure.reraise_if_any, fls)
self.assertEqual(list(exc), fls)
def test_failure_copy(self):
@@ -149,9 +149,9 @@ class FailureObjectTestCase(test.TestCase):
def test_failure_copy_recaptured(self):
captured = _captured_failure('Woot!')
- fail_obj = failure.Failure(exception_str=captured.exception_str,
- traceback_str=captured.traceback_str,
- exc_type_names=list(captured))
+ fail_obj = misc.Failure(exception_str=captured.exception_str,
+ traceback_str=captured.traceback_str,
+ exc_type_names=list(captured))
copied = fail_obj.copy()
self.assertIsNot(fail_obj, copied)
self.assertEqual(fail_obj, copied)
@@ -160,9 +160,9 @@ class FailureObjectTestCase(test.TestCase):
def test_recaptured_not_eq(self):
captured = _captured_failure('Woot!')
- fail_obj = failure.Failure(exception_str=captured.exception_str,
- traceback_str=captured.traceback_str,
- exc_type_names=list(captured))
+ fail_obj = misc.Failure(exception_str=captured.exception_str,
+ traceback_str=captured.traceback_str,
+ exc_type_names=list(captured))
self.assertFalse(fail_obj == captured)
self.assertTrue(fail_obj != captured)
self.assertTrue(fail_obj.matches(captured))
@@ -174,13 +174,13 @@ class FailureObjectTestCase(test.TestCase):
def test_two_recaptured_neq(self):
captured = _captured_failure('Woot!')
- fail_obj = failure.Failure(exception_str=captured.exception_str,
- traceback_str=captured.traceback_str,
- exc_type_names=list(captured))
+ fail_obj = misc.Failure(exception_str=captured.exception_str,
+ traceback_str=captured.traceback_str,
+ exc_type_names=list(captured))
new_exc_str = captured.exception_str.replace('Woot', 'w00t')
- fail_obj2 = failure.Failure(exception_str=new_exc_str,
- traceback_str=captured.traceback_str,
- exc_type_names=list(captured))
+ fail_obj2 = misc.Failure(exception_str=new_exc_str,
+ traceback_str=captured.traceback_str,
+ exc_type_names=list(captured))
self.assertNotEqual(fail_obj, fail_obj2)
self.assertFalse(fail_obj2.matches(fail_obj))
@@ -220,7 +220,7 @@ class WrappedFailureTestCase(test.TestCase):
try:
raise exceptions.WrappedFailure([f1, f2])
except Exception:
- fail_obj = failure.Failure()
+ fail_obj = misc.Failure()
wf = exceptions.WrappedFailure([fail_obj, f3])
self.assertEqual(list(wf), [f1, f2, f3])
@@ -230,13 +230,13 @@ class NonAsciiExceptionsTestCase(test.TestCase):
def test_exception_with_non_ascii_str(self):
bad_string = chr(200)
- fail = failure.Failure.from_exception(ValueError(bad_string))
+ fail = misc.Failure.from_exception(ValueError(bad_string))
self.assertEqual(fail.exception_str, bad_string)
self.assertEqual(str(fail), 'Failure: ValueError: %s' % bad_string)
def test_exception_non_ascii_unicode(self):
hi_ru = u'привет'
- fail = failure.Failure.from_exception(ValueError(hi_ru))
+ fail = misc.Failure.from_exception(ValueError(hi_ru))
self.assertEqual(fail.exception_str, hi_ru)
self.assertIsInstance(fail.exception_str, six.text_type)
self.assertEqual(six.text_type(fail),
@@ -246,7 +246,7 @@ class NonAsciiExceptionsTestCase(test.TestCase):
hi_cn = u'嗨'
fail = ValueError(hi_cn)
self.assertEqual(hi_cn, exceptions.exception_message(fail))
- fail = failure.Failure.from_exception(fail)
+ fail = misc.Failure.from_exception(fail)
wrapped_fail = exceptions.WrappedFailure([fail])
if six.PY2:
# Python 2.x will unicode escape it, while python 3.3+ will not,
@@ -261,12 +261,12 @@ class NonAsciiExceptionsTestCase(test.TestCase):
def test_failure_equality_with_non_ascii_str(self):
bad_string = chr(200)
- fail = failure.Failure.from_exception(ValueError(bad_string))
+ fail = misc.Failure.from_exception(ValueError(bad_string))
copied = fail.copy()
self.assertEqual(fail, copied)
def test_failure_equality_non_ascii_unicode(self):
hi_ru = u'привет'
- fail = failure.Failure.from_exception(ValueError(hi_ru))
+ fail = misc.Failure.from_exception(ValueError(hi_ru))
copied = fail.copy()
self.assertEqual(fail, copied)
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py
index fc162e9..7509200 100644
--- a/taskflow/tests/unit/worker_based/test_executor.py
+++ b/taskflow/tests/unit/worker_based/test_executor.py
@@ -23,9 +23,9 @@ from kombu import exceptions as kombu_exc
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
-from taskflow import failure
from taskflow import test
from taskflow.tests import utils
+from taskflow.utils import misc
class TestWorkerTaskExecutor(test.MockTestCase):
@@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()])
def test_on_message_response_state_failure(self):
- fail = failure.Failure.from_exception(Exception('test'))
- failure_dict = fail.to_dict()
+ failure = misc.Failure.from_exception(Exception('test'))
+ failure_dict = failure.to_dict()
response = pr.Response(pr.FAILURE, result=failure_dict)
ex = self.executor()
ex._requests_cache.set(self.task_uuid, self.request_inst_mock)
@@ -120,7 +120,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(len(ex._requests_cache._data), 0)
self.assertEqual(self.request_inst_mock.mock_calls, [
- mock.call.set_result(result=utils.FailureMatcher(fail))
+ mock.call.set_result(result=utils.FailureMatcher(failure))
])
self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()])
diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py
index ea18ab9..27d6e00 100644
--- a/taskflow/tests/unit/worker_based/test_protocol.py
+++ b/taskflow/tests/unit/worker_based/test_protocol.py
@@ -19,9 +19,9 @@ import mock
from concurrent import futures
from taskflow.engines.worker_based import protocol as pr
-from taskflow import failure
from taskflow import test
from taskflow.tests import utils
+from taskflow.utils import misc
class TestProtocol(test.TestCase):
@@ -81,15 +81,15 @@ class TestProtocol(test.TestCase):
self.request_to_dict(result=('success', None)))
def test_to_dict_with_result_failure(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
- expected = self.request_to_dict(result=('failure', fail.to_dict()))
- self.assertEqual(self.request(result=fail).to_dict(), expected)
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
+ expected = self.request_to_dict(result=('failure', failure.to_dict()))
+ self.assertEqual(self.request(result=failure).to_dict(), expected)
def test_to_dict_with_failures(self):
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
- request = self.request(failures={self.task.name: fail})
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
+ request = self.request(failures={self.task.name: failure})
expected = self.request_to_dict(
- failures={self.task.name: fail.to_dict()})
+ failures={self.task.name: failure.to_dict()})
self.assertEqual(request.to_dict(), expected)
@mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock')
diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py
index b155d77..a4eab7a 100644
--- a/taskflow/tests/unit/worker_based/test_server.py
+++ b/taskflow/tests/unit/worker_based/test_server.py
@@ -23,9 +23,9 @@ from kombu import exceptions as exc
from taskflow.engines.worker_based import endpoint as ep
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import server
-from taskflow import failure
from taskflow import test
from taskflow.tests import utils
+from taskflow.utils import misc
class TestServer(test.MockTestCase):
@@ -185,19 +185,19 @@ class TestServer(test.MockTestCase):
result=1)))
def test_parse_request_with_failure_result(self):
- fail = failure.Failure.from_exception(Exception('test'))
- request = self.make_request(action='revert', result=fail)
+ failure = misc.Failure.from_exception(Exception('test'))
+ request = self.make_request(action='revert', result=failure)
task_cls, action, task_args = server.Server._parse_request(**request)
self.assertEqual((task_cls, action, task_args),
(self.task.name, 'revert',
dict(task_name=self.task.name,
arguments=self.task_args,
- result=utils.FailureMatcher(fail))))
+ result=utils.FailureMatcher(failure))))
def test_parse_request_with_failures(self):
- failures = {'0': failure.Failure.from_exception(Exception('test1')),
- '1': failure.Failure.from_exception(Exception('test2'))}
+ failures = {'0': misc.Failure.from_exception(Exception('test1')),
+ '1': misc.Failure.from_exception(Exception('test2'))}
request = self.make_request(action='revert', failures=failures)
task_cls, action, task_args = server.Server._parse_request(**request)
@@ -274,16 +274,16 @@ class TestServer(test.MockTestCase):
self.assertEqual(self.master_mock.mock_calls, [])
self.assertTrue(mocked_exception.called)
- @mock.patch.object(failure.Failure, 'from_dict')
- @mock.patch.object(failure.Failure, 'to_dict')
+ @mock.patch.object(misc.Failure, 'from_dict')
+ @mock.patch.object(misc.Failure, 'to_dict')
def test_process_request_parse_request_failure(self, to_mock, from_mock):
failure_dict = {
'failure': 'failure',
}
- fail = failure.Failure.from_exception(RuntimeError('Woot!'))
+ failure = misc.Failure.from_exception(RuntimeError('Woot!'))
to_mock.return_value = failure_dict
from_mock.side_effect = ValueError('Woot!')
- request = self.make_request(result=fail)
+ request = self.make_request(result=failure)
# create server and process request
s = self.server(reset_master_mock=True)
@@ -298,7 +298,7 @@ class TestServer(test.MockTestCase):
]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
- @mock.patch.object(failure.Failure, 'to_dict')
+ @mock.patch.object(misc.Failure, 'to_dict')
def test_process_request_endpoint_not_found(self, to_mock):
failure_dict = {
'failure': 'failure',
@@ -319,7 +319,7 @@ class TestServer(test.MockTestCase):
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
- @mock.patch.object(failure.Failure, 'to_dict')
+ @mock.patch.object(misc.Failure, 'to_dict')
def test_process_request_execution_failure(self, to_mock):
failure_dict = {
'failure': 'failure',
@@ -344,7 +344,7 @@ class TestServer(test.MockTestCase):
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
- @mock.patch.object(failure.Failure, 'to_dict')
+ @mock.patch.object(misc.Failure, 'to_dict')
def test_process_request_task_failure(self, to_mock):
failure_dict = {
'failure': 'failure',
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 5ed9b75..20fc375 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -19,15 +19,31 @@ import threading
import six
+from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow import retry
from taskflow import task
+from taskflow.utils import misc
ARGS_KEY = '__args__'
KWARGS_KEY = '__kwargs__'
ORDER_KEY = '__order__'
+@contextlib.contextmanager
+def wrap_all_failures():
+ """Convert any exceptions to WrappedFailure.
+
+ When you expect several failures, it may be convenient
+ to wrap any exception with WrappedFailure in order to
+ unify error handling.
+ """
+ try:
+ yield
+ except Exception:
+ raise exceptions.WrappedFailure([misc.Failure()])
+
+
class DummyTask(task.Task):
def execute(self, context, *args, **kwargs):
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index 1e8bcb1..1da28a4 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright (C) 2012-2014 Yahoo! Inc. All Rights Reserved.
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -16,6 +16,7 @@
# under the License.
import collections
+import contextlib
import copy
import errno
import functools
@@ -23,6 +24,7 @@ import keyword
import logging
import os
import string
+import sys
import threading
import time
import traceback
@@ -511,3 +513,195 @@ def are_equal_exc_info_tuples(ei1, ei2):
tb1 = traceback.format_tb(ei1[2])
tb2 = traceback.format_tb(ei2[2])
return tb1 == tb2
+
+
+@contextlib.contextmanager
+def capture_failure():
+ """Save current exception, and yield back the failure (or raises a
+ runtime error if no active exception is being handled).
+
+ In some cases the exception context can be cleared, resulting in None
+ being attempted to be saved after an exception handler is run. This
+ can happen when eventlet switches greenthreads or when running an
+ exception handler, code raises and catches an exception. In both
+ cases the exception context will be cleared.
+
+ To work around this, we save the exception state, yield a failure and
+ then run other code.
+
+ For example::
+
+ except Exception:
+ with capture_failure() as fail:
+ LOG.warn("Activating cleanup")
+ cleanup()
+ save_failure(fail)
+ """
+ exc_info = sys.exc_info()
+ if not any(exc_info):
+ raise RuntimeError("No active exception is being handled")
+ else:
+ yield Failure(exc_info=exc_info)
+
+
+class Failure(object):
+ """Object that represents failure.
+
+ Failure objects encapsulate exception information so that
+ it can be re-used later to re-raise or inspect.
+ """
+ DICT_VERSION = 1
+
+ def __init__(self, exc_info=None, **kwargs):
+ if not kwargs:
+ if exc_info is None:
+ exc_info = sys.exc_info()
+ self._exc_info = exc_info
+ self._exc_type_names = list(
+ reflection.get_all_class_names(exc_info[0], up_to=Exception))
+ if not self._exc_type_names:
+ raise TypeError('Invalid exception type: %r' % exc_info[0])
+ self._exception_str = exc.exception_message(self._exc_info[1])
+ self._traceback_str = ''.join(
+ traceback.format_tb(self._exc_info[2]))
+ else:
+ self._exc_info = exc_info # may be None
+ self._exception_str = kwargs.pop('exception_str')
+ self._exc_type_names = kwargs.pop('exc_type_names', [])
+ self._traceback_str = kwargs.pop('traceback_str', None)
+ if kwargs:
+ raise TypeError(
+ 'Failure.__init__ got unexpected keyword argument(s): %s'
+ % ', '.join(six.iterkeys(kwargs)))
+
+ @classmethod
+ def from_exception(cls, exception):
+ return cls((type(exception), exception, None))
+
+ def _matches(self, other):
+ if self is other:
+ return True
+ return (self._exc_type_names == other._exc_type_names
+ and self.exception_str == other.exception_str
+ and self.traceback_str == other.traceback_str)
+
+ def matches(self, other):
+ if not isinstance(other, Failure):
+ return False
+ if self.exc_info is None or other.exc_info is None:
+ return self._matches(other)
+ else:
+ return self == other
+
+ def __eq__(self, other):
+ if not isinstance(other, Failure):
+ return NotImplemented
+ return (self._matches(other) and
+ are_equal_exc_info_tuples(self.exc_info, other.exc_info))
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ # NOTE(imelnikov): obj.__hash__() should return same values for equal
+ # objects, so we should redefine __hash__. Failure equality semantics
+ # is a bit complicated, so for now we just mark Failure objects as
+ # unhashable. See python docs on object.__hash__ for more info:
+ # http://docs.python.org/2/reference/datamodel.html#object.__hash__
+ __hash__ = None
+
+ @property
+ def exception(self):
+ """Exception value, or None if exception value is not present.
+
+ Exception value may be lost during serialization.
+ """
+ if self._exc_info:
+ return self._exc_info[1]
+ else:
+ return None
+
+ @property
+ def exception_str(self):
+ """String representation of exception."""
+ return self._exception_str
+
+ @property
+ def exc_info(self):
+ """Exception info tuple or None."""
+ return self._exc_info
+
+ @property
+ def traceback_str(self):
+ """Exception traceback as string."""
+ return self._traceback_str
+
+ @staticmethod
+ def reraise_if_any(failures):
+ """Re-raise exceptions if argument is not empty.
+
+ If argument is empty list, this method returns None. If
+ argument is list with single Failure object in it,
+ this failure is reraised. Else, WrappedFailure exception
+ is raised with failures list as causes.
+ """
+ failures = list(failures)
+ if len(failures) == 1:
+ failures[0].reraise()
+ elif len(failures) > 1:
+ raise exc.WrappedFailure(failures)
+
+ def reraise(self):
+ """Re-raise captured exception."""
+ if self._exc_info:
+ six.reraise(*self._exc_info)
+ else:
+ raise exc.WrappedFailure([self])
+
+ def check(self, *exc_classes):
+ """Check if any of exc_classes caused the failure.
+
+ Arguments of this method can be exception types or type
+ names (stings). If captured exception is instance of
+ exception of given type, the corresponding argument is
+ returned. Else, None is returned.
+ """
+ for cls in exc_classes:
+ if isinstance(cls, type):
+ err = reflection.get_class_name(cls)
+ else:
+ err = cls
+ if err in self._exc_type_names:
+ return cls
+ return None
+
+ def __str__(self):
+ return 'Failure: %s: %s' % (self._exc_type_names[0],
+ self._exception_str)
+
+ def __iter__(self):
+ """Iterate over exception type names."""
+ for et in self._exc_type_names:
+ yield et
+
+ @classmethod
+ def from_dict(cls, data):
+ data = dict(data)
+ version = data.pop('version', None)
+ if version != cls.DICT_VERSION:
+ raise ValueError('Invalid dict version of failure object: %r'
+ % version)
+ return cls(**data)
+
+ def to_dict(self):
+ return {
+ 'exception_str': self.exception_str,
+ 'traceback_str': self.traceback_str,
+ 'exc_type_names': list(self),
+ 'version': self.DICT_VERSION,
+ }
+
+ def copy(self):
+ return Failure(exc_info=copy_exc_info(self.exc_info),
+ exception_str=self.exception_str,
+ traceback_str=self.traceback_str,
+ exc_type_names=self._exc_type_names[:])