diff options
author | Greg Hill <greg.hill@rackspace.com> | 2016-01-26 15:49:23 -0600 |
---|---|---|
committer | Greg Hill <greg.hill@rackspace.com> | 2016-01-28 16:29:03 -0600 |
commit | 8e8fd65225788183c848e19d927a00c729570f65 (patch) | |
tree | c3d556087f69ac5011bc6022af4791ee29a6fd88 | |
parent | f9a2d78bd8dd5d5d0698a89474fab14f60ddd77f (diff) | |
download | taskflow-8e8fd65225788183c848e19d927a00c729570f65.tar.gz |
Handle cases where exc_args can't be serialized as JSON in the WBE
First try to get them, but if they aren't able to be serialized,
just return the failure without them. This only affects the WBE
that serializes failures as part of the JSON response that is sent
over the wire from the worker process to the WBE engine.
Change-Id: I86e3255b612bc15097aabe63a684cf8b8808e61b
-rw-r--r-- | taskflow/engines/worker_based/protocol.py | 17 | ||||
-rw-r--r-- | taskflow/engines/worker_based/server.py | 10 | ||||
-rw-r--r-- | taskflow/tests/unit/test_failure.py | 10 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_protocol.py | 8 | ||||
-rw-r--r-- | taskflow/types/failure.py | 10 |
5 files changed, 45 insertions, 10 deletions
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 63556c2..d2d8e34 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -20,6 +20,7 @@ import threading import fasteners import futurist +from oslo_serialization import jsonutils from oslo_utils import reflection from oslo_utils import timeutils import six @@ -100,6 +101,17 @@ RESPONSE = 'RESPONSE' LOG = logging.getLogger(__name__) +def failure_to_dict(failure): + failure_dict = failure.to_dict() + try: + # it's possible the exc_args can't be serialized as JSON + # if that's the case, just get the failure without them + jsonutils.dumps(failure_dict) + return failure_dict + except (TypeError, ValueError): + return failure.to_dict(include_args=False) + + @six.add_metaclass(abc.ABCMeta) class Message(object): """Base class for all message types.""" @@ -301,6 +313,7 @@ class Request(Message): convert all `failure.Failure` objects into dictionaries (which will then be reconstituted by the receiver). """ + request = { 'task_cls': reflection.get_class_name(self._task), 'task_name': self._task.name, @@ -311,14 +324,14 @@ class Request(Message): if 'result' in self._kwargs: result = self._kwargs['result'] if isinstance(result, ft.Failure): - request['result'] = ('failure', result.to_dict()) + request['result'] = ('failure', failure_to_dict(result)) else: request['result'] = ('success', result) if 'failures' in self._kwargs: failures = self._kwargs['failures'] request['failures'] = {} for task, failure in six.iteritems(failures): - request['failures'][task] = failure.to_dict() + request['failures'][task] = failure_to_dict(failure) return request def set_result(self, result): diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 4c3b32b..978ab3a 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -179,7 +179,7 @@ class Server(object): with misc.capture_failure() as failure: LOG.warn("Failed to parse request contents from message '%s'", ku.DelayedPretty(message), exc_info=True) - reply_callback(result=failure.to_dict()) + reply_callback(result=pr.failure_to_dict(failure)) return # Now fetch the task endpoint (and action handler on it). @@ -191,7 +191,7 @@ class Server(object): " to continue processing request message '%s'", work.task_cls, ku.DelayedPretty(message), exc_info=True) - reply_callback(result=failure.to_dict()) + reply_callback(result=pr.failure_to_dict(failure)) return else: try: @@ -202,7 +202,7 @@ class Server(object): " '%s', unable to continue processing request" " message '%s'", work.action, endpoint, ku.DelayedPretty(message), exc_info=True) - reply_callback(result=failure.to_dict()) + reply_callback(result=pr.failure_to_dict(failure)) return else: try: @@ -212,7 +212,7 @@ class Server(object): LOG.warn("The '%s' task '%s' generation for request" " message '%s' failed", endpoint, work.action, ku.DelayedPretty(message), exc_info=True) - reply_callback(result=failure.to_dict()) + reply_callback(result=pr.failure_to_dict(failure)) return else: if not reply_callback(state=pr.RUNNING): @@ -240,7 +240,7 @@ class Server(object): LOG.warn("The '%s' endpoint '%s' execution for request" " message '%s' failed", endpoint, work.action, ku.DelayedPretty(message), exc_info=True) - reply_callback(result=failure.to_dict()) + reply_callback(result=pr.failure_to_dict(failure)) else: # And be done with it! if isinstance(result, ft.Failure): diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py index 6d12ac0..bc95dd0 100644 --- a/taskflow/tests/unit/test_failure.py +++ b/taskflow/tests/unit/test_failure.py @@ -283,6 +283,16 @@ class FailureObjectTestCase(test.TestCase): text = captured.pformat(traceback=True) self.assertIn("Traceback (most recent call last):", text) + def test_no_capture_exc_args(self): + captured = _captured_failure(Exception("I am not valid JSON")) + fail_obj = failure.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured), + exc_args=list(captured.exception_args)) + fail_json = fail_obj.to_dict(include_args=False) + self.assertNotEqual(fail_obj.exception_args, fail_json['exc_args']) + self.assertEqual(fail_json['exc_args'], tuple()) + class WrappedFailureTestCase(test.TestCase): diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 7111686..a0127eb 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -162,6 +162,14 @@ class TestProtocol(test.TestCase): failures={self.task.name: a_failure.to_dict()}) self.assertEqual(expected, request.to_dict()) + def test_to_dict_with_invalid_json_failures(self): + exc = RuntimeError(Exception("I am not valid JSON")) + a_failure = failure.Failure.from_exception(exc) + request = self.request(failures={self.task.name: a_failure}) + expected = self.request_to_dict( + failures={self.task.name: a_failure.to_dict(include_args=False)}) + self.assertEqual(expected, request.to_dict()) + @mock.patch('oslo_utils.timeutils.now') def test_pending_not_expired(self, now): now.return_value = 0 diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index 9b7b218..ec33dd9 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -499,14 +499,18 @@ class Failure(mixins.StrMixin): data['causes'] = tuple(cls.from_dict(d) for d in causes) return cls(**data) - def to_dict(self): - """Converts this object to a dictionary.""" + def to_dict(self, include_args=True): + """Converts this object to a dictionary. + + :param include_args: boolean indicating whether to include the + exception args in the output. + """ return { 'exception_str': self.exception_str, 'traceback_str': self.traceback_str, 'exc_type_names': list(self), 'version': self.DICT_VERSION, - 'exc_args': self.exception_args, + 'exc_args': self.exception_args if include_args else tuple(), 'causes': [f.to_dict() for f in self.causes], } |