diff options
| author | Jeremy Stanley <fungi@yuggoth.org> | 2014-03-31 12:51:17 +0000 |
|---|---|---|
| committer | Jeremy Stanley <fungi@yuggoth.org> | 2014-03-31 12:59:31 +0000 |
| commit | c95bf4165d41241fdbbfefe62653450eb7e5e838 (patch) | |
| tree | b949126192d60a7ddb43e3d2db1779947779ea27 /taskflow | |
| parent | c2a4989ea88000dc3a532588a96bab23c233d5fb (diff) | |
| download | taskflow-c95bf4165d41241fdbbfefe62653450eb7e5e838.tar.gz | |
Revert "Move taskflow.utils.misc.Failure to its own module"
This reverts commit 42ca240e8157b840c298d14fbf478ae570376633 which
was a breaking change in a library consumed by other OpenStack
projects with no deprecation or backwards compatibility
considerations. It was able to merge because openstack/taskflow is
apparently not yet part of the integrated gate via the proposed
I202f4809afd689155e2cc4a00fc704fd772a0e92 change.
Change-Id: I96cf36dc317499df91e43502efc85221f8177395
Closes-Bug: #1300161
Diffstat (limited to 'taskflow')
26 files changed, 368 insertions, 397 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 83d5d5f..2e43f84 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -24,7 +24,6 @@ from taskflow.engines.action_engine import task_action from taskflow.engines import base from taskflow import exceptions as exc -from taskflow import failure from taskflow.openstack.common import excutils from taskflow import retry from taskflow import states @@ -116,7 +115,7 @@ class ActionEngine(base.EngineBase): self._change_state(state) if state != states.SUSPENDED and state != states.SUCCESS: failures = self.storage.get_failures() - failure.Failure.reraise_if_any(failures.values()) + misc.Failure.reraise_if_any(failures.values()) @lock_utils.locked(lock='_state_lock') def _change_state(self, state): diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 9837ad9..846cc56 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -19,8 +19,8 @@ import abc from concurrent import futures import six -from taskflow import failure from taskflow.utils import async_utils +from taskflow.utils import misc from taskflow.utils import threading_utils # Execution and reversion events. @@ -35,7 +35,7 @@ def _execute_task(task, arguments, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = failure.Failure() + result = misc.Failure() return (task, EXECUTED, result) @@ -49,7 +49,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = failure.Failure() + result = misc.Failure() return (task, REVERTED, result) diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index c7d54b8..ae7f901 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -16,10 +16,10 @@ from taskflow.engines.action_engine import executor as ex from taskflow import exceptions as excp -from taskflow import failure from taskflow import retry as r from taskflow import states as st from taskflow import task +from taskflow.utils import misc _WAITING_TIMEOUT = 60 # in seconds @@ -77,7 +77,7 @@ class FutureGraphAction(object): node, event, result = future.result() if isinstance(node, task.BaseTask): self._complete_task(node, event, result) - if isinstance(result, failure.Failure): + if isinstance(result, misc.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: @@ -88,7 +88,7 @@ class FutureGraphAction(object): not_done.extend(self._schedule(next_nodes)) if failures: - failure.Failure.reraise_if_any(failures) + misc.Failure.reraise_if_any(failures) if self._analyzer.get_next_nodes(): return st.SUSPENDED diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index c6ca85b..a860f69 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,9 +18,9 @@ import logging from taskflow.engines.action_engine import executor as ex from taskflow import exceptions -from taskflow import failure from taskflow import states from taskflow.utils import async_utils +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -63,7 +63,7 @@ class RetryAction(object): try: result = retry.execute(**kwargs) except Exception: - result = failure.Failure() + result = misc.Failure() self.change_state(retry, states.FAILURE, result=result) else: self.change_state(retry, states.SUCCESS, result=result) @@ -79,7 +79,7 @@ class RetryAction(object): try: result = retry.revert(**kwargs) except Exception: - result = failure.Failure() + result = misc.Failure() self.change_state(retry, states.FAILURE) else: self.change_state(retry, states.REVERTED) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index ac2c9c1..32c0a17 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -17,8 +17,8 @@ import logging from taskflow import exceptions -from taskflow import failure from taskflow import states +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -71,7 +71,7 @@ class TaskAction(object): self._on_update_progress) def complete_execution(self, task, result): - if isinstance(result, failure.Failure): + if isinstance(result, misc.Failure): self.change_state(task, states.FAILURE, result=result) else: self.change_state(task, states.SUCCESS, @@ -91,7 +91,7 @@ class TaskAction(object): return future def complete_reversion(self, task, rev_result): - if isinstance(rev_result, failure.Failure): + if isinstance(rev_result, misc.Failure): self.change_state(task, states.FAILURE) else: self.change_state(task, states.REVERTED, progress=1.0) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index c6b259b..4c4fc06 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -24,7 +24,6 @@ from taskflow.engines.worker_based import cache from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import exceptions as exc -from taskflow import failure from taskflow.utils import async_utils from taskflow.utils import misc @@ -126,7 +125,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): """ LOG.debug("Request '%r' has expired.", request) LOG.debug("The '%r' request has expired.", request) - request.set_result(failure.Failure.from_exception( + request.set_result(misc.Failure.from_exception( exc.RequestTimeout("The '%r' request has expired" % request))) def _on_wait(self): @@ -162,11 +161,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): reply_to=self._uuid, correlation_id=request.uuid) except Exception: - with failure.capture_failure() as fail: + with misc.capture_failure() as failure: LOG.exception("Failed to submit the '%s' request." % request) self._requests_cache.delete(request.uuid) - request.set_result(fail) + request.set_result(failure) def _notify_topics(self): """Cyclically publish notify message to each topic.""" diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index b8f8939..7085905 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -21,7 +21,6 @@ import six from concurrent import futures from taskflow.engines.action_engine import executor -from taskflow import failure from taskflow.utils import misc from taskflow.utils import reflection @@ -137,7 +136,7 @@ class Request(Message): return False def to_dict(self): - """Return json-serializable request, converting all `failure.Failure` + """Return json-serializable request, converting all `misc.Failure` objects into dictionaries. """ request = dict(task_cls=self._task_cls, task_name=self._task.name, @@ -145,15 +144,15 @@ class Request(Message): arguments=self._arguments) if 'result' in self._kwargs: result = self._kwargs['result'] - if isinstance(result, failure.Failure): + if isinstance(result, misc.Failure): request['result'] = ('failure', result.to_dict()) else: request['result'] = ('success', result) if 'failures' in self._kwargs: failures = self._kwargs['failures'] request['failures'] = {} - for task, fail in six.iteritems(failures): - request['failures'][task] = fail.to_dict() + for task, failure in six.iteritems(failures): + request['failures'][task] = failure.to_dict() return request def set_result(self, result): @@ -183,7 +182,7 @@ class Response(Message): state = data['state'] data = data['data'] if state == FAILURE and 'result' in data: - data['result'] = failure.Failure.from_dict(data['result']) + data['result'] = misc.Failure.from_dict(data['result']) return cls(state, **data) @property diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index b026c0c..02f5664 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -21,7 +21,7 @@ from kombu import exceptions as kombu_exc from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy -from taskflow import failure +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -77,22 +77,21 @@ class Server(object): @staticmethod def _parse_request(task_cls, task_name, action, arguments, result=None, failures=None, **kwargs): - """Parse request before it can be processed. - - All `failure.Failure` objects that have been converted to dict on the - remote side to be serializable are now converted back to objects. + """Parse request before it can be processed. All `misc.Failure` objects + that have been converted to dict on the remote side to be serializable + are now converted back to objects. """ action_args = dict(arguments=arguments, task_name=task_name) if result is not None: data_type, data = result if data_type == 'failure': - action_args['result'] = failure.Failure.from_dict(data) + action_args['result'] = misc.Failure.from_dict(data) else: action_args['result'] = data if failures is not None: action_args['failures'] = {} for k, v in failures.items(): - action_args['failures'][k] = failure.Failure.from_dict(v) + action_args['failures'][k] = misc.Failure.from_dict(v) return task_cls, action, action_args @staticmethod @@ -162,19 +161,19 @@ class Server(object): action_args.update(task_uuid=task_uuid, progress_callback=progress_callback) except ValueError: - with failure.capture_failure() as fail: + with misc.capture_failure() as failure: LOG.exception("Failed to parse request") - reply_callback(result=fail.to_dict()) + reply_callback(result=failure.to_dict()) return # get task endpoint try: endpoint = self._endpoints[task_cls] except KeyError: - with failure.capture_failure() as fail: + with misc.capture_failure() as failure: LOG.exception("The '%s' task endpoint does not exist", task_cls) - reply_callback(result=fail.to_dict()) + reply_callback(result=failure.to_dict()) return else: reply_callback(state=pr.RUNNING) @@ -183,11 +182,11 @@ class Server(object): try: result = getattr(endpoint, action)(**action_args) except Exception: - with failure.capture_failure() as fail: + with misc.capture_failure() as failure: LOG.exception("The %s task execution failed", endpoint) - reply_callback(result=fail.to_dict()) + reply_callback(result=failure.to_dict()) else: - if isinstance(result, failure.Failure): + if isinstance(result, misc.Failure): reply_callback(result=result.to_dict()) else: reply_callback(state=pr.SUCCESS, result=result) diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index 843bd11..17ae632 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -31,9 +31,10 @@ sys.path.insert(0, top_dir) import taskflow.engines from taskflow import exceptions -from taskflow import failure from taskflow.patterns import unordered_flow as uf from taskflow import task +from taskflow.tests import utils +from taskflow.utils import misc # INTRO: In this example we create two tasks which can trigger exceptions # based on various inputs to show how to analyze the thrown exceptions for @@ -95,20 +96,20 @@ def run(**store): SecondTask() ) try: - with failure.wrap_all_failures(): + with utils.wrap_all_failures(): taskflow.engines.run(flow, store=store, engine_conf='parallel') except exceptions.WrappedFailure as ex: unknown_failures = [] - for fail in ex: - if fail.check(FirstException): - print("Got FirstException: %s" % fail.exception_str) - elif fail.check(SecondException): - print("Got SecondException: %s" % fail.exception_str) + for failure in ex: + if failure.check(FirstException): + print("Got FirstException: %s" % failure.exception_str) + elif failure.check(SecondException): + print("Got SecondException: %s" % failure.exception_str) else: - print("Unknown failure: %s" % fail) - unknown_failures.append(fail) - failure.Failure.reraise_if_any(unknown_failures) + print("Unknown failure: %s" % failure) + unknown_failures.append(failure) + misc.Failure.reraise_if_any(unknown_failures) print_wrapped("Raise and catch first exception only") diff --git a/taskflow/failure.py b/taskflow/failure.py deleted file mode 100644 index 5dd825b..0000000 --- a/taskflow/failure.py +++ /dev/null @@ -1,231 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2013-2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import sys -import traceback - -import six - -from taskflow import exceptions as exc -from taskflow.utils import misc -from taskflow.utils import reflection - - -@contextlib.contextmanager -def wrap_all_failures(): - """Convert any exceptions to WrappedFailure. - - When you expect several failures, it may be convenient - to wrap any exception with WrappedFailure in order to - unify error handling. - """ - try: - yield - except Exception: - raise exc.WrappedFailure([Failure()]) - - -@contextlib.contextmanager -def capture_failure(): - """Save current exception, and yield back the failure (or raises a - runtime error if no active exception is being handled). - - In some cases the exception context can be cleared, resulting in None - being attempted to be saved after an exception handler is run. This - can happen when eventlet switches greenthreads or when running an - exception handler, code raises and catches an exception. In both - cases the exception context will be cleared. - - To work around this, we save the exception state, yield a failure and - then run other code. - - For example:: - - except Exception: - with capture_failure() as fail: - LOG.warn("Activating cleanup") - cleanup() - save_failure(fail) - """ - exc_info = sys.exc_info() - if not any(exc_info): - raise RuntimeError("No active exception is being handled") - else: - yield Failure(exc_info=exc_info) - - -class Failure(object): - """Object that represents failure. - - Failure objects encapsulate exception information so that - it can be re-used later to re-raise or inspect. - """ - DICT_VERSION = 1 - - def __init__(self, exc_info=None, **kwargs): - if not kwargs: - if exc_info is None: - exc_info = sys.exc_info() - self._exc_info = exc_info - self._exc_type_names = list( - reflection.get_all_class_names(exc_info[0], up_to=Exception)) - if not self._exc_type_names: - raise TypeError('Invalid exception type: %r' % exc_info[0]) - self._exception_str = exc.exception_message(self._exc_info[1]) - self._traceback_str = ''.join( - traceback.format_tb(self._exc_info[2])) - else: - self._exc_info = exc_info # may be None - self._exception_str = kwargs.pop('exception_str') - self._exc_type_names = kwargs.pop('exc_type_names', []) - self._traceback_str = kwargs.pop('traceback_str', None) - if kwargs: - raise TypeError( - 'Failure.__init__ got unexpected keyword argument(s): %s' - % ', '.join(six.iterkeys(kwargs))) - - @classmethod - def from_exception(cls, exception): - return cls((type(exception), exception, None)) - - def _matches(self, other): - if self is other: - return True - return (self._exc_type_names == other._exc_type_names - and self.exception_str == other.exception_str - and self.traceback_str == other.traceback_str) - - def matches(self, other): - if not isinstance(other, Failure): - return False - if self.exc_info is None or other.exc_info is None: - return self._matches(other) - else: - return self == other - - def __eq__(self, other): - if not isinstance(other, Failure): - return NotImplemented - return (self._matches(other) and - misc.are_equal_exc_info_tuples(self.exc_info, other.exc_info)) - - def __ne__(self, other): - return not (self == other) - - # NOTE(imelnikov): obj.__hash__() should return same values for equal - # objects, so we should redefine __hash__. Failure equality semantics - # is a bit complicated, so for now we just mark Failure objects as - # unhashable. See python docs on object.__hash__ for more info: - # http://docs.python.org/2/reference/datamodel.html#object.__hash__ - __hash__ = None - - @property - def exception(self): - """Exception value, or None if exception value is not present. - - Exception value may be lost during serialization. - """ - if self._exc_info: - return self._exc_info[1] - else: - return None - - @property - def exception_str(self): - """String representation of exception.""" - return self._exception_str - - @property - def exc_info(self): - """Exception info tuple or None.""" - return self._exc_info - - @property - def traceback_str(self): - """Exception traceback as string.""" - return self._traceback_str - - @staticmethod - def reraise_if_any(failures): - """Re-raise exceptions if argument is not empty. - - If argument is empty list, this method returns None. If - argument is list with single Failure object in it, - this failure is reraised. Else, WrappedFailure exception - is raised with failures list as causes. - """ - failures = list(failures) - if len(failures) == 1: - failures[0].reraise() - elif len(failures) > 1: - raise exc.WrappedFailure(failures) - - def reraise(self): - """Re-raise captured exception.""" - if self._exc_info: - six.reraise(*self._exc_info) - else: - raise exc.WrappedFailure([self]) - - def check(self, *exc_classes): - """Check if any of exc_classes caused the failure. - - Arguments of this method can be exception types or type - names (stings). If captured exception is instance of - exception of given type, the corresponding argument is - returned. Else, None is returned. - """ - for cls in exc_classes: - if isinstance(cls, type): - err = reflection.get_class_name(cls) - else: - err = cls - if err in self._exc_type_names: - return cls - return None - - def __str__(self): - return 'Failure: %s: %s' % (self._exc_type_names[0], - self._exception_str) - - def __iter__(self): - """Iterate over exception type names.""" - for et in self._exc_type_names: - yield et - - @classmethod - def from_dict(cls, data): - data = dict(data) - version = data.pop('version', None) - if version != cls.DICT_VERSION: - raise ValueError('Invalid dict version of failure object: %r' - % version) - return cls(**data) - - def to_dict(self): - return { - 'exception_str': self.exception_str, - 'traceback_str': self.traceback_str, - 'exc_type_names': list(self), - 'version': self.DICT_VERSION, - } - - def copy(self): - return Failure(exc_info=misc.copy_exc_info(self.exc_info), - exception_str=self.exception_str, - traceback_str=self.traceback_str, - exc_type_names=self._exc_type_names[:]) diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index 9c601d4..e8f1674 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -21,7 +21,6 @@ import logging import six -from taskflow import failure from taskflow.openstack.common import excutils from taskflow import states from taskflow.utils import misc @@ -142,7 +141,7 @@ class LoggingBase(ListenerBase): result = details.get('result') exc_info = None was_failure = False - if isinstance(result, failure.Failure): + if isinstance(result, misc.Failure): if result.exc_info: exc_info = tuple(result.exc_info) was_failure = True diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 248c545..2067ff5 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -31,7 +31,6 @@ from sqlalchemy import orm as sa_orm from sqlalchemy import pool as sa_pool from taskflow import exceptions as exc -from taskflow import failure from taskflow.persistence.backends import base from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import models @@ -268,7 +267,7 @@ class Connection(base.Connection): pass except sa_exc.OperationalError as ex: if _is_db_connection_error(six.text_type(ex.args[0])): - failures.append(failure.Failure()) + failures.append(misc.Failure()) return False return True @@ -514,7 +513,7 @@ def _convert_ad_to_external(ad): # to change the internal sqlalchemy model easily by forcing a defined # interface (that isn't the sqlalchemy model itself). atom_cls = logbook.atom_detail_class(ad.atom_type) - result = atom_cls.from_dict({ + return atom_cls.from_dict({ 'state': ad.state, 'intention': ad.intention, 'results': ad.results, @@ -524,7 +523,6 @@ def _convert_ad_to_external(ad): 'name': ad.name, 'uuid': ad.uuid, }) - return result def _convert_lb_to_external(lb_m): diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 4dece26..f378b67 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -21,10 +21,10 @@ import logging import six -from taskflow import failure from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow import states +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -311,11 +311,11 @@ class AtomDetail(object): def _to_dict_shared(self): if self.failure: - fail = self.failure.to_dict() + failure = self.failure.to_dict() else: - fail = None + failure = None return { - 'failure': fail, + 'failure': failure, 'meta': self.meta, 'name': self.name, 'results': self.results, @@ -331,9 +331,9 @@ class AtomDetail(object): self.results = data.get('results') self.version = data.get('version') self.meta = _fix_meta(data) - fail = data.get('failure') - if fail: - self.failure = failure.Failure.from_dict(fail) + failure = data.get('failure') + if failure: + self.failure = misc.Failure.from_dict(failure) @property def uuid(self): @@ -405,7 +405,7 @@ class RetryDetail(AtomDetail): for (data, failures) in results: new_failures = {} for (key, failure_data) in six.iteritems(failures): - new_failures[key] = failure.Failure.from_dict(failure_data) + new_failures[key] = misc.Failure.from_dict(failure_data) new_results.append((data, new_failures)) return new_results @@ -423,8 +423,8 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, fail) in six.iteritems(failures): - new_failures[key] = fail.to_dict() + for (key, failure) in six.iteritems(failures): + new_failures[key] = failure.to_dict() new_results.append((data, new_failures)) return new_results @@ -443,11 +443,11 @@ class RetryDetail(AtomDetail): # contain tracebacks, which are not copyable. for (data, failures) in other.results: copied_failures = {} - for (key, fail) in six.iteritems(failures): + for (key, failure) in six.iteritems(failures): if deep_copy: - copied_failures[key] = fail.copy() + copied_failures[key] = failure.copy() else: - copied_failures[key] = fail + copied_failures[key] = failure results.append((data, copied_failures)) self.results = results return self diff --git a/taskflow/storage.py b/taskflow/storage.py index 23c90d5..dca870f 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -21,7 +21,6 @@ import logging import six from taskflow import exceptions -from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states @@ -54,7 +53,7 @@ class Storage(object): self._lock = self._lock_cls() # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in task name -> failure.Failure mapping. + # so we cache failures here, in task name -> misc.Failure mapping. self._failures = {} for ad in self._flowdetail: if ad.failure is not None: @@ -326,7 +325,7 @@ class Storage(object): with self._lock.write_lock(): ad = self._atomdetail_by_name(atom_name) ad.state = state - if state == states.FAILURE and isinstance(data, failure.Failure): + if state == states.FAILURE and isinstance(data, misc.Failure): # FIXME(harlowja): this seems like it should be internal logic # in the atom detail object and not in here. Fix that soon... # diff --git a/taskflow/test.py b/taskflow/test.py index 0ecbce4..ce99a37 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -24,7 +24,7 @@ import fixtures import six from taskflow import exceptions -from taskflow import failure +from taskflow.tests import utils from taskflow.utils import misc @@ -49,13 +49,13 @@ class FailureRegexpMatcher(object): self.exc_class = exc_class self.pattern = pattern - def match(self, fail): - for cause in fail: + def match(self, failure): + for cause in failure: if cause.check(self.exc_class) is not None: return matchers.MatchesRegex( self.pattern).match(cause.exception_str) return matchers.Mismatch("The `%s` wasn't caused by the `%s`" % - (fail, self.exc_class)) + (failure, self.exc_class)) class ItemsEqual(object): @@ -171,7 +171,7 @@ class TestCase(testcase.TestCase): string matches to the given pattern. """ try: - with failure.wrap_all_failures(): + with utils.wrap_all_failures(): callable_obj(*args, **kwargs) except exceptions.WrappedFailure as e: self.assertThat(e, FailureRegexpMatcher(exc_class, pattern)) diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index ca5a2db..3d28695 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -17,10 +17,10 @@ import contextlib from taskflow import exceptions as exc -from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states +from taskflow.utils import misc class PersistenceTestMixin(object): @@ -147,7 +147,7 @@ class PersistenceTestMixin(object): try: raise RuntimeError('Woot!') except Exception: - td.failure = failure.Failure() + td.failure = misc.Failure() fd.add(td) @@ -161,10 +161,10 @@ class PersistenceTestMixin(object): lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) - fail = td2.failure - self.assertEqual(fail.exception_str, 'Woot!') - self.assertIs(fail.check(RuntimeError), RuntimeError) - self.assertEqual(fail.traceback_str, td.failure.traceback_str) + failure = td2.failure + self.assertEqual(failure.exception_str, 'Woot!') + self.assertIs(failure.check(RuntimeError), RuntimeError) + self.assertEqual(failure.traceback_str, td.failure.traceback_str) self.assertIsInstance(td2, logbook.TaskDetail) def test_logbook_merge_flow_detail(self): @@ -269,7 +269,7 @@ class PersistenceTestMixin(object): fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) - fail = failure.Failure.from_exception(RuntimeError('fail')) + fail = misc.Failure.from_exception(RuntimeError('fail')) rd.results.append((42, {'some-task': fail})) fd.add(rd) @@ -286,7 +286,7 @@ class PersistenceTestMixin(object): rd2 = fd2.find(rd.uuid) self.assertIsInstance(rd2, logbook.RetryDetail) fail2 = rd2.results[0][1].get('some-task') - self.assertIsInstance(fail2, failure.Failure) + self.assertIsInstance(fail2, misc.Failure) self.assertTrue(fail.matches(fail2)) def test_retry_detail_save_intention(self): diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 26bbcd9..d2401f4 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -31,7 +31,6 @@ from taskflow.engines.action_engine import engine as eng from taskflow.engines.worker_based import engine as w_eng from taskflow.engines.worker_based import worker as wkr from taskflow import exceptions as exc -from taskflow import failure from taskflow.persistence import logbook from taskflow import states from taskflow import task @@ -39,6 +38,7 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu +from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -488,7 +488,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): self.assertEqual(result, 'RESULT') self.assertEqual(list(flow_failures.keys()), ['fail1']) fail = flow_failures['fail1'] - self.assertIsInstance(fail, failure.Failure) + self.assertIsInstance(fail, misc.Failure) self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') flow = lf.Flow('test').add( diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 44874f1..42f4a5f 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -21,11 +21,11 @@ from taskflow.patterns import unordered_flow as uf import taskflow.engines from taskflow import exceptions as exc -from taskflow import failure from taskflow import retry from taskflow import states as st from taskflow import test from taskflow.tests import utils +from taskflow.utils import misc class RetryTest(utils.EngineTestBase): @@ -558,7 +558,7 @@ class RetryTest(utils.EngineTestBase): # we execute retry engine.storage.save('flow-1_retry', 1) # task fails - fail = failure.Failure.from_exception(RuntimeError('foo')), + fail = misc.Failure.from_exception(RuntimeError('foo')), engine.storage.save('task1', fail, state=st.FAILURE) if when == 'task fails': return engine @@ -634,7 +634,7 @@ class RetryTest(utils.EngineTestBase): self._make_engine(flow).run) self.assertEqual(len(r.history), 1) self.assertEqual(r.history[0][1], {}) - self.assertEqual(isinstance(r.history[0][0], failure.Failure), True) + self.assertEqual(isinstance(r.history[0][0], misc.Failure), True) def test_retry_revert_fails(self): @@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase): engine.storage.save('test2_retry', 1) engine.storage.save('b', 11) # pretend that 'c' failed - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + fail = misc.Failure.from_exception(RuntimeError('Woot!')) engine.storage.save('c', fail, st.FAILURE) engine.run() diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 2c6fcd2..eb08819 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -20,13 +20,13 @@ import threading import mock from taskflow import exceptions -from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import backends from taskflow.persistence import logbook from taskflow import states from taskflow import storage from taskflow import test +from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -127,46 +127,46 @@ class StorageTestMixin(object): self.assertEqual(s.get_atom_state('my task'), states.FAILURE) def test_save_and_get_cached_failure(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) - self.assertEqual(s.get('my task'), fail) + s.save('my task', failure, states.FAILURE) + self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertTrue(s.has_failures()) - self.assertEqual(s.get_failures(), {'my task': fail}) + self.assertEqual(s.get_failures(), {'my task': failure}) def test_save_and_get_non_cached_failure(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) - self.assertEqual(s.get('my task'), fail) + s.save('my task', failure, states.FAILURE) + self.assertEqual(s.get('my task'), failure) s._failures['my task'] = None - self.assertTrue(fail.matches(s.get('my task'))) + self.assertTrue(failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) + s.save('my task', failure, states.FAILURE) s.set_atom_state('my task', states.REVERTING) - self.assertEqual(s.get('my task'), fail) + self.assertEqual(s.get('my task'), failure) s.set_atom_state('my task', states.REVERTED) - self.assertEqual(s.get('my task'), fail) + self.assertEqual(s.get('my task'), failure) def test_get_failure_after_reload(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) + s.save('my task', failure, states.FAILURE) s2 = self._get_storage(s._flowdetail) self.assertTrue(s2.has_failures()) self.assertEqual(1, len(s2.get_failures())) - self.assertTrue(fail.matches(s2.get('my task'))) + self.assertTrue(failure.matches(s2.get('my task'))) self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) def test_get_non_existing_var(self): @@ -483,15 +483,15 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) def test_cached_retry_failure(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_retry('my retry', result_mapping={'x': 0}) s.save('my retry', 'a') - s.save('my retry', fail, states.FAILURE) + s.save('my retry', failure, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (fail, {})]) + self.assertEqual(history, [('a', {}), (failure, {})]) self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my retry': fail}) + self.assertEqual(s.get_failures(), {'my retry': failure}) def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 5923dc4..1c5c197 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -19,7 +19,6 @@ import functools import sys import time -from taskflow import failure from taskflow import states from taskflow import test from taskflow.tests import utils as test_utils @@ -285,8 +284,8 @@ class GetClassNameTest(test.TestCase): self.assertEqual(name, 'RuntimeError') def test_global_class(self): - name = reflection.get_class_name(failure.Failure) - self.assertEqual(name, 'taskflow.failure.Failure') + name = reflection.get_class_name(misc.Failure) + self.assertEqual(name, 'taskflow.utils.misc.Failure') def test_class(self): name = reflection.get_class_name(Class) diff --git a/taskflow/tests/unit/test_utils_failure.py b/taskflow/tests/unit/test_utils_failure.py index be42129..394abfd 100644 --- a/taskflow/tests/unit/test_utils_failure.py +++ b/taskflow/tests/unit/test_utils_failure.py @@ -20,14 +20,14 @@ from taskflow import exceptions from taskflow import test from taskflow.tests import utils as test_utils -from taskflow import failure +from taskflow.utils import misc def _captured_failure(msg): try: raise RuntimeError(msg) except Exception: - return failure.Failure() + return misc.Failure() class GeneralFailureObjTestsMixin(object): @@ -82,9 +82,9 @@ class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(ReCreatedFailureTestCase, self).setUp() fail_obj = _captured_failure('Woot!') - self.fail_obj = failure.Failure(exception_str=fail_obj.exception_str, - traceback_str=fail_obj.traceback_str, - exc_type_names=list(fail_obj)) + self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str, + traceback_str=fail_obj.traceback_str, + exc_type_names=list(fail_obj)) def test_value_lost(self): self.assertIs(self.fail_obj.exception, None) @@ -102,7 +102,7 @@ class FromExceptionTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(FromExceptionTestCase, self).setUp() - self.fail_obj = failure.Failure.from_exception(RuntimeError('Woot!')) + self.fail_obj = misc.Failure.from_exception(RuntimeError('Woot!')) class FailureObjectTestCase(test.TestCase): @@ -111,10 +111,10 @@ class FailureObjectTestCase(test.TestCase): try: raise SystemExit() except BaseException: - self.assertRaises(TypeError, failure.Failure) + self.assertRaises(TypeError, misc.Failure) def test_unknown_argument(self): - exc = self.assertRaises(TypeError, failure.Failure, + exc = self.assertRaises(TypeError, misc.Failure, exception_str='Woot!', traceback_str=None, exc_type_names=['Exception'], @@ -123,12 +123,12 @@ class FailureObjectTestCase(test.TestCase): self.assertEqual(str(exc), expected) def test_empty_does_not_reraise(self): - self.assertIs(failure.Failure.reraise_if_any([]), None) + self.assertIs(misc.Failure.reraise_if_any([]), None) def test_reraises_one(self): fls = [_captured_failure('Woot!')] self.assertRaisesRegexp(RuntimeError, '^Woot!$', - failure.Failure.reraise_if_any, fls) + misc.Failure.reraise_if_any, fls) def test_reraises_several(self): fls = [ @@ -136,7 +136,7 @@ class FailureObjectTestCase(test.TestCase): _captured_failure('Oh, not again!') ] exc = self.assertRaises(exceptions.WrappedFailure, - failure.Failure.reraise_if_any, fls) + misc.Failure.reraise_if_any, fls) self.assertEqual(list(exc), fls) def test_failure_copy(self): @@ -149,9 +149,9 @@ class FailureObjectTestCase(test.TestCase): def test_failure_copy_recaptured(self): captured = _captured_failure('Woot!') - fail_obj = failure.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) copied = fail_obj.copy() self.assertIsNot(fail_obj, copied) self.assertEqual(fail_obj, copied) @@ -160,9 +160,9 @@ class FailureObjectTestCase(test.TestCase): def test_recaptured_not_eq(self): captured = _captured_failure('Woot!') - fail_obj = failure.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) self.assertFalse(fail_obj == captured) self.assertTrue(fail_obj != captured) self.assertTrue(fail_obj.matches(captured)) @@ -174,13 +174,13 @@ class FailureObjectTestCase(test.TestCase): def test_two_recaptured_neq(self): captured = _captured_failure('Woot!') - fail_obj = failure.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) new_exc_str = captured.exception_str.replace('Woot', 'w00t') - fail_obj2 = failure.Failure(exception_str=new_exc_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj2 = misc.Failure(exception_str=new_exc_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) self.assertNotEqual(fail_obj, fail_obj2) self.assertFalse(fail_obj2.matches(fail_obj)) @@ -220,7 +220,7 @@ class WrappedFailureTestCase(test.TestCase): try: raise exceptions.WrappedFailure([f1, f2]) except Exception: - fail_obj = failure.Failure() + fail_obj = misc.Failure() wf = exceptions.WrappedFailure([fail_obj, f3]) self.assertEqual(list(wf), [f1, f2, f3]) @@ -230,13 +230,13 @@ class NonAsciiExceptionsTestCase(test.TestCase): def test_exception_with_non_ascii_str(self): bad_string = chr(200) - fail = failure.Failure.from_exception(ValueError(bad_string)) + fail = misc.Failure.from_exception(ValueError(bad_string)) self.assertEqual(fail.exception_str, bad_string) self.assertEqual(str(fail), 'Failure: ValueError: %s' % bad_string) def test_exception_non_ascii_unicode(self): hi_ru = u'привет' - fail = failure.Failure.from_exception(ValueError(hi_ru)) + fail = misc.Failure.from_exception(ValueError(hi_ru)) self.assertEqual(fail.exception_str, hi_ru) self.assertIsInstance(fail.exception_str, six.text_type) self.assertEqual(six.text_type(fail), @@ -246,7 +246,7 @@ class NonAsciiExceptionsTestCase(test.TestCase): hi_cn = u'嗨' fail = ValueError(hi_cn) self.assertEqual(hi_cn, exceptions.exception_message(fail)) - fail = failure.Failure.from_exception(fail) + fail = misc.Failure.from_exception(fail) wrapped_fail = exceptions.WrappedFailure([fail]) if six.PY2: # Python 2.x will unicode escape it, while python 3.3+ will not, @@ -261,12 +261,12 @@ class NonAsciiExceptionsTestCase(test.TestCase): def test_failure_equality_with_non_ascii_str(self): bad_string = chr(200) - fail = failure.Failure.from_exception(ValueError(bad_string)) + fail = misc.Failure.from_exception(ValueError(bad_string)) copied = fail.copy() self.assertEqual(fail, copied) def test_failure_equality_non_ascii_unicode(self): hi_ru = u'привет' - fail = failure.Failure.from_exception(ValueError(hi_ru)) + fail = misc.Failure.from_exception(ValueError(hi_ru)) copied = fail.copy() self.assertEqual(fail, copied) diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index fc162e9..7509200 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -23,9 +23,9 @@ from kombu import exceptions as kombu_exc from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr -from taskflow import failure from taskflow import test from taskflow.tests import utils +from taskflow.utils import misc class TestWorkerTaskExecutor(test.MockTestCase): @@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()]) def test_on_message_response_state_failure(self): - fail = failure.Failure.from_exception(Exception('test')) - failure_dict = fail.to_dict() + failure = misc.Failure.from_exception(Exception('test')) + failure_dict = failure.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() ex._requests_cache.set(self.task_uuid, self.request_inst_mock) @@ -120,7 +120,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache._data), 0) self.assertEqual(self.request_inst_mock.mock_calls, [ - mock.call.set_result(result=utils.FailureMatcher(fail)) + mock.call.set_result(result=utils.FailureMatcher(failure)) ]) self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()]) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index ea18ab9..27d6e00 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -19,9 +19,9 @@ import mock from concurrent import futures from taskflow.engines.worker_based import protocol as pr -from taskflow import failure from taskflow import test from taskflow.tests import utils +from taskflow.utils import misc class TestProtocol(test.TestCase): @@ -81,15 +81,15 @@ class TestProtocol(test.TestCase): self.request_to_dict(result=('success', None))) def test_to_dict_with_result_failure(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) - expected = self.request_to_dict(result=('failure', fail.to_dict())) - self.assertEqual(self.request(result=fail).to_dict(), expected) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) + expected = self.request_to_dict(result=('failure', failure.to_dict())) + self.assertEqual(self.request(result=failure).to_dict(), expected) def test_to_dict_with_failures(self): - fail = failure.Failure.from_exception(RuntimeError('Woot!')) - request = self.request(failures={self.task.name: fail}) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) + request = self.request(failures={self.task.name: failure}) expected = self.request_to_dict( - failures={self.task.name: fail.to_dict()}) + failures={self.task.name: failure.to_dict()}) self.assertEqual(request.to_dict(), expected) @mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock') diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index b155d77..a4eab7a 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -23,9 +23,9 @@ from kombu import exceptions as exc from taskflow.engines.worker_based import endpoint as ep from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import server -from taskflow import failure from taskflow import test from taskflow.tests import utils +from taskflow.utils import misc class TestServer(test.MockTestCase): @@ -185,19 +185,19 @@ class TestServer(test.MockTestCase): result=1))) def test_parse_request_with_failure_result(self): - fail = failure.Failure.from_exception(Exception('test')) - request = self.make_request(action='revert', result=fail) + failure = misc.Failure.from_exception(Exception('test')) + request = self.make_request(action='revert', result=failure) task_cls, action, task_args = server.Server._parse_request(**request) self.assertEqual((task_cls, action, task_args), (self.task.name, 'revert', dict(task_name=self.task.name, arguments=self.task_args, - result=utils.FailureMatcher(fail)))) + result=utils.FailureMatcher(failure)))) def test_parse_request_with_failures(self): - failures = {'0': failure.Failure.from_exception(Exception('test1')), - '1': failure.Failure.from_exception(Exception('test2'))} + failures = {'0': misc.Failure.from_exception(Exception('test1')), + '1': misc.Failure.from_exception(Exception('test2'))} request = self.make_request(action='revert', failures=failures) task_cls, action, task_args = server.Server._parse_request(**request) @@ -274,16 +274,16 @@ class TestServer(test.MockTestCase): self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) - @mock.patch.object(failure.Failure, 'from_dict') - @mock.patch.object(failure.Failure, 'to_dict') + @mock.patch.object(misc.Failure, 'from_dict') + @mock.patch.object(misc.Failure, 'to_dict') def test_process_request_parse_request_failure(self, to_mock, from_mock): failure_dict = { 'failure': 'failure', } - fail = failure.Failure.from_exception(RuntimeError('Woot!')) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) to_mock.return_value = failure_dict from_mock.side_effect = ValueError('Woot!') - request = self.make_request(result=fail) + request = self.make_request(result=failure) # create server and process request s = self.server(reset_master_mock=True) @@ -298,7 +298,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) - @mock.patch.object(failure.Failure, 'to_dict') + @mock.patch.object(misc.Failure, 'to_dict') def test_process_request_endpoint_not_found(self, to_mock): failure_dict = { 'failure': 'failure', @@ -319,7 +319,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(failure.Failure, 'to_dict') + @mock.patch.object(misc.Failure, 'to_dict') def test_process_request_execution_failure(self, to_mock): failure_dict = { 'failure': 'failure', @@ -344,7 +344,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(failure.Failure, 'to_dict') + @mock.patch.object(misc.Failure, 'to_dict') def test_process_request_task_failure(self, to_mock): failure_dict = { 'failure': 'failure', diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 5ed9b75..20fc375 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -19,15 +19,31 @@ import threading import six +from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task +from taskflow.utils import misc ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' +@contextlib.contextmanager +def wrap_all_failures(): + """Convert any exceptions to WrappedFailure. + + When you expect several failures, it may be convenient + to wrap any exception with WrappedFailure in order to + unify error handling. + """ + try: + yield + except Exception: + raise exceptions.WrappedFailure([misc.Failure()]) + + class DummyTask(task.Task): def execute(self, context, *args, **kwargs): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 1e8bcb1..1da28a4 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (C) 2012-2014 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. # Copyright (C) 2013 Rackspace Hosting All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -16,6 +16,7 @@ # under the License. import collections +import contextlib import copy import errno import functools @@ -23,6 +24,7 @@ import keyword import logging import os import string +import sys import threading import time import traceback @@ -511,3 +513,195 @@ def are_equal_exc_info_tuples(ei1, ei2): tb1 = traceback.format_tb(ei1[2]) tb2 = traceback.format_tb(ei2[2]) return tb1 == tb2 + + +@contextlib.contextmanager +def capture_failure(): + """Save current exception, and yield back the failure (or raises a + runtime error if no active exception is being handled). + + In some cases the exception context can be cleared, resulting in None + being attempted to be saved after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, yield a failure and + then run other code. + + For example:: + + except Exception: + with capture_failure() as fail: + LOG.warn("Activating cleanup") + cleanup() + save_failure(fail) + """ + exc_info = sys.exc_info() + if not any(exc_info): + raise RuntimeError("No active exception is being handled") + else: + yield Failure(exc_info=exc_info) + + +class Failure(object): + """Object that represents failure. + + Failure objects encapsulate exception information so that + it can be re-used later to re-raise or inspect. + """ + DICT_VERSION = 1 + + def __init__(self, exc_info=None, **kwargs): + if not kwargs: + if exc_info is None: + exc_info = sys.exc_info() + self._exc_info = exc_info + self._exc_type_names = list( + reflection.get_all_class_names(exc_info[0], up_to=Exception)) + if not self._exc_type_names: + raise TypeError('Invalid exception type: %r' % exc_info[0]) + self._exception_str = exc.exception_message(self._exc_info[1]) + self._traceback_str = ''.join( + traceback.format_tb(self._exc_info[2])) + else: + self._exc_info = exc_info # may be None + self._exception_str = kwargs.pop('exception_str') + self._exc_type_names = kwargs.pop('exc_type_names', []) + self._traceback_str = kwargs.pop('traceback_str', None) + if kwargs: + raise TypeError( + 'Failure.__init__ got unexpected keyword argument(s): %s' + % ', '.join(six.iterkeys(kwargs))) + + @classmethod + def from_exception(cls, exception): + return cls((type(exception), exception, None)) + + def _matches(self, other): + if self is other: + return True + return (self._exc_type_names == other._exc_type_names + and self.exception_str == other.exception_str + and self.traceback_str == other.traceback_str) + + def matches(self, other): + if not isinstance(other, Failure): + return False + if self.exc_info is None or other.exc_info is None: + return self._matches(other) + else: + return self == other + + def __eq__(self, other): + if not isinstance(other, Failure): + return NotImplemented + return (self._matches(other) and + are_equal_exc_info_tuples(self.exc_info, other.exc_info)) + + def __ne__(self, other): + return not (self == other) + + # NOTE(imelnikov): obj.__hash__() should return same values for equal + # objects, so we should redefine __hash__. Failure equality semantics + # is a bit complicated, so for now we just mark Failure objects as + # unhashable. See python docs on object.__hash__ for more info: + # http://docs.python.org/2/reference/datamodel.html#object.__hash__ + __hash__ = None + + @property + def exception(self): + """Exception value, or None if exception value is not present. + + Exception value may be lost during serialization. + """ + if self._exc_info: + return self._exc_info[1] + else: + return None + + @property + def exception_str(self): + """String representation of exception.""" + return self._exception_str + + @property + def exc_info(self): + """Exception info tuple or None.""" + return self._exc_info + + @property + def traceback_str(self): + """Exception traceback as string.""" + return self._traceback_str + + @staticmethod + def reraise_if_any(failures): + """Re-raise exceptions if argument is not empty. + + If argument is empty list, this method returns None. If + argument is list with single Failure object in it, + this failure is reraised. Else, WrappedFailure exception + is raised with failures list as causes. + """ + failures = list(failures) + if len(failures) == 1: + failures[0].reraise() + elif len(failures) > 1: + raise exc.WrappedFailure(failures) + + def reraise(self): + """Re-raise captured exception.""" + if self._exc_info: + six.reraise(*self._exc_info) + else: + raise exc.WrappedFailure([self]) + + def check(self, *exc_classes): + """Check if any of exc_classes caused the failure. + + Arguments of this method can be exception types or type + names (stings). If captured exception is instance of + exception of given type, the corresponding argument is + returned. Else, None is returned. + """ + for cls in exc_classes: + if isinstance(cls, type): + err = reflection.get_class_name(cls) + else: + err = cls + if err in self._exc_type_names: + return cls + return None + + def __str__(self): + return 'Failure: %s: %s' % (self._exc_type_names[0], + self._exception_str) + + def __iter__(self): + """Iterate over exception type names.""" + for et in self._exc_type_names: + yield et + + @classmethod + def from_dict(cls, data): + data = dict(data) + version = data.pop('version', None) + if version != cls.DICT_VERSION: + raise ValueError('Invalid dict version of failure object: %r' + % version) + return cls(**data) + + def to_dict(self): + return { + 'exception_str': self.exception_str, + 'traceback_str': self.traceback_str, + 'exc_type_names': list(self), + 'version': self.DICT_VERSION, + } + + def copy(self): + return Failure(exc_info=copy_exc_info(self.exc_info), + exception_str=self.exception_str, + traceback_str=self.traceback_str, + exc_type_names=self._exc_type_names[:]) |
