summaryrefslogtreecommitdiff
path: root/taskflow/engines/worker_based/server.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-03-10 18:01:44 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-03-11 12:53:15 -0700
commitad133adea6ee293f0cfc6145a483fa0cfc27faf6 (patch)
tree6989d2c82383391ab2c33ebc575f1894699a2f33 /taskflow/engines/worker_based/server.py
parent1478f52c9ad8c21d9fa5396118bde54ab8cc6bd7 (diff)
downloadtaskflow-ad133adea6ee293f0cfc6145a483fa0cfc27faf6.tar.gz
Add + use failure json schema validation
Change-Id: Ie3aa386c831459a028ba494570bafd53b998126e
Diffstat (limited to 'taskflow/engines/worker_based/server.py')
-rw-r--r--taskflow/engines/worker_based/server.py30
1 files changed, 1 insertions, 29 deletions
diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py
index 1043e87..99a3989 100644
--- a/taskflow/engines/worker_based/server.py
+++ b/taskflow/engines/worker_based/server.py
@@ -17,7 +17,6 @@
import functools
from oslo_utils import reflection
-import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
@@ -95,32 +94,6 @@ class Server(object):
return self._proxy.connection_details
@staticmethod
- def _parse_request(task_cls, task_name, action, arguments, result=None,
- failures=None, **kwargs):
- """Parse request before it can be further processed.
-
- All `failure.Failure` objects that have been converted to dict on the
- remote side will now converted back to `failure.Failure` objects.
- """
- # These arguments will eventually be given to the task executor
- # so they need to be in a format it will accept (and using keyword
- # argument names that it accepts)...
- arguments = {
- 'arguments': arguments,
- }
- if result is not None:
- data_type, data = result
- if data_type == 'failure':
- arguments['result'] = ft.Failure.from_dict(data)
- else:
- arguments['result'] = data
- if failures is not None:
- arguments['failures'] = {}
- for key, data in six.iteritems(failures):
- arguments['failures'][key] = ft.Failure.from_dict(data)
- return (task_cls, task_name, action, arguments)
-
- @staticmethod
def _parse_message(message):
"""Extracts required attributes out of the messages properties.
@@ -201,9 +174,8 @@ class Server(object):
# parse request to get task name, action and action arguments
try:
- bundle = self._parse_request(**request)
+ bundle = pr.Request.from_dict(request, task_uuid=task_uuid)
task_cls, task_name, action, arguments = bundle
- arguments['task_uuid'] = task_uuid
except ValueError:
with misc.capture_failure() as failure:
LOG.warn("Failed to parse request contents from message '%s'",