summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-03 22:39:58 +0000
committerGerrit Code Review <review@openstack.org>2016-02-03 22:39:58 +0000
commit61efc31e965b8defac84aa494095fd3734f739eb (patch)
tree11f784d16aed7733d91e857809c3d9df5d929a63
parent48c4187a2e07684d4db024f75653ea8cbc207aa5 (diff)
parent8e8fd65225788183c848e19d927a00c729570f65 (diff)
downloadtaskflow-61efc31e965b8defac84aa494095fd3734f739eb.tar.gz
Merge "Handle cases where exc_args can't be serialized as JSON in the WBE"
-rw-r--r--taskflow/engines/worker_based/protocol.py17
-rw-r--r--taskflow/engines/worker_based/server.py10
-rw-r--r--taskflow/tests/unit/test_failure.py10
-rw-r--r--taskflow/tests/unit/worker_based/test_protocol.py8
-rw-r--r--taskflow/types/failure.py10
5 files changed, 45 insertions, 10 deletions
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py
index 63556c2..d2d8e34 100644
--- a/taskflow/engines/worker_based/protocol.py
+++ b/taskflow/engines/worker_based/protocol.py
@@ -20,6 +20,7 @@ import threading
import fasteners
import futurist
+from oslo_serialization import jsonutils
from oslo_utils import reflection
from oslo_utils import timeutils
import six
@@ -100,6 +101,17 @@ RESPONSE = 'RESPONSE'
LOG = logging.getLogger(__name__)
+def failure_to_dict(failure):
+ failure_dict = failure.to_dict()
+ try:
+ # it's possible the exc_args can't be serialized as JSON
+ # if that's the case, just get the failure without them
+ jsonutils.dumps(failure_dict)
+ return failure_dict
+ except (TypeError, ValueError):
+ return failure.to_dict(include_args=False)
+
+
@six.add_metaclass(abc.ABCMeta)
class Message(object):
"""Base class for all message types."""
@@ -301,6 +313,7 @@ class Request(Message):
convert all `failure.Failure` objects into dictionaries (which will
then be reconstituted by the receiver).
"""
+
request = {
'task_cls': reflection.get_class_name(self._task),
'task_name': self._task.name,
@@ -311,14 +324,14 @@ class Request(Message):
if 'result' in self._kwargs:
result = self._kwargs['result']
if isinstance(result, ft.Failure):
- request['result'] = ('failure', result.to_dict())
+ request['result'] = ('failure', failure_to_dict(result))
else:
request['result'] = ('success', result)
if 'failures' in self._kwargs:
failures = self._kwargs['failures']
request['failures'] = {}
for task, failure in six.iteritems(failures):
- request['failures'][task] = failure.to_dict()
+ request['failures'][task] = failure_to_dict(failure)
return request
def set_result(self, result):
diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py
index 4c3b32b..978ab3a 100644
--- a/taskflow/engines/worker_based/server.py
+++ b/taskflow/engines/worker_based/server.py
@@ -179,7 +179,7 @@ class Server(object):
with misc.capture_failure() as failure:
LOG.warn("Failed to parse request contents from message '%s'",
ku.DelayedPretty(message), exc_info=True)
- reply_callback(result=failure.to_dict())
+ reply_callback(result=pr.failure_to_dict(failure))
return
# Now fetch the task endpoint (and action handler on it).
@@ -191,7 +191,7 @@ class Server(object):
" to continue processing request message '%s'",
work.task_cls, ku.DelayedPretty(message),
exc_info=True)
- reply_callback(result=failure.to_dict())
+ reply_callback(result=pr.failure_to_dict(failure))
return
else:
try:
@@ -202,7 +202,7 @@ class Server(object):
" '%s', unable to continue processing request"
" message '%s'", work.action, endpoint,
ku.DelayedPretty(message), exc_info=True)
- reply_callback(result=failure.to_dict())
+ reply_callback(result=pr.failure_to_dict(failure))
return
else:
try:
@@ -212,7 +212,7 @@ class Server(object):
LOG.warn("The '%s' task '%s' generation for request"
" message '%s' failed", endpoint, work.action,
ku.DelayedPretty(message), exc_info=True)
- reply_callback(result=failure.to_dict())
+ reply_callback(result=pr.failure_to_dict(failure))
return
else:
if not reply_callback(state=pr.RUNNING):
@@ -240,7 +240,7 @@ class Server(object):
LOG.warn("The '%s' endpoint '%s' execution for request"
" message '%s' failed", endpoint, work.action,
ku.DelayedPretty(message), exc_info=True)
- reply_callback(result=failure.to_dict())
+ reply_callback(result=pr.failure_to_dict(failure))
else:
# And be done with it!
if isinstance(result, ft.Failure):
diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py
index 6d12ac0..bc95dd0 100644
--- a/taskflow/tests/unit/test_failure.py
+++ b/taskflow/tests/unit/test_failure.py
@@ -283,6 +283,16 @@ class FailureObjectTestCase(test.TestCase):
text = captured.pformat(traceback=True)
self.assertIn("Traceback (most recent call last):", text)
+ def test_no_capture_exc_args(self):
+ captured = _captured_failure(Exception("I am not valid JSON"))
+ fail_obj = failure.Failure(exception_str=captured.exception_str,
+ traceback_str=captured.traceback_str,
+ exc_type_names=list(captured),
+ exc_args=list(captured.exception_args))
+ fail_json = fail_obj.to_dict(include_args=False)
+ self.assertNotEqual(fail_obj.exception_args, fail_json['exc_args'])
+ self.assertEqual(fail_json['exc_args'], tuple())
+
class WrappedFailureTestCase(test.TestCase):
diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py
index 7111686..a0127eb 100644
--- a/taskflow/tests/unit/worker_based/test_protocol.py
+++ b/taskflow/tests/unit/worker_based/test_protocol.py
@@ -162,6 +162,14 @@ class TestProtocol(test.TestCase):
failures={self.task.name: a_failure.to_dict()})
self.assertEqual(expected, request.to_dict())
+ def test_to_dict_with_invalid_json_failures(self):
+ exc = RuntimeError(Exception("I am not valid JSON"))
+ a_failure = failure.Failure.from_exception(exc)
+ request = self.request(failures={self.task.name: a_failure})
+ expected = self.request_to_dict(
+ failures={self.task.name: a_failure.to_dict(include_args=False)})
+ self.assertEqual(expected, request.to_dict())
+
@mock.patch('oslo_utils.timeutils.now')
def test_pending_not_expired(self, now):
now.return_value = 0
diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py
index 9b7b218..ec33dd9 100644
--- a/taskflow/types/failure.py
+++ b/taskflow/types/failure.py
@@ -499,14 +499,18 @@ class Failure(mixins.StrMixin):
data['causes'] = tuple(cls.from_dict(d) for d in causes)
return cls(**data)
- def to_dict(self):
- """Converts this object to a dictionary."""
+ def to_dict(self, include_args=True):
+ """Converts this object to a dictionary.
+
+ :param include_args: boolean indicating whether to include the
+ exception args in the output.
+ """
return {
'exception_str': self.exception_str,
'traceback_str': self.traceback_str,
'exc_type_names': list(self),
'version': self.DICT_VERSION,
- 'exc_args': self.exception_args,
+ 'exc_args': self.exception_args if include_args else tuple(),
'causes': [f.to_dict() for f in self.causes],
}