summaryrefslogtreecommitdiff
path: root/doc/source/user/inputs_and_outputs.rst
blob: d0b5b7ef9333697c4ab0dd1a8aa2ab8b3f84316b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
==================
Inputs and outputs
==================

In TaskFlow there are multiple ways to provide inputs for your tasks and flows
and get information from them. This document describes one of them, that
involves task arguments and results. There are also :doc:`notifications
<notifications>`, which allow you to get notified when a task or flow changes
state. You may also opt to use the :doc:`persistence <persistence>` layer
itself directly.

-----------------------
Flow inputs and outputs
-----------------------

Tasks accept inputs via task arguments and provide outputs via task results
(see :doc:`arguments and results <arguments_and_results>` for more details).
This is the standard and recommended way to pass data from one task to another.
Of course not every task argument needs to be provided to some other task of a
flow, and not every task result should be consumed by every task.

If some value is required by one or more tasks of a flow, but it is not
provided by any task, it is considered to be flow input, and **must** be put
into the storage before the flow is run. A set of names required by a flow can
be retrieved via that flow's ``requires`` property. These names can be used to
determine what names may be applicable for placing in storage ahead of time
and which names are not applicable.

All values provided by tasks of the flow are considered to be flow outputs; the
set of names of such values is available via the ``provides`` property of the
flow.

.. testsetup::

    from taskflow import task
    from taskflow.patterns import linear_flow
    from taskflow import engines
    from pprint import pprint

For example:

.. doctest::

   >>> class MyTask(task.Task):
   ...     def execute(self, **kwargs):
   ...         return 1, 2
   ...
   >>> flow = linear_flow.Flow('test').add(
   ...     MyTask(requires='a', provides=('b', 'c')),
   ...     MyTask(requires='b', provides='d')
   ... )
   >>> flow.requires
   frozenset(['a'])
   >>> sorted(flow.provides)
   ['b', 'c', 'd']

.. make vim syntax highlighter happy**

As you can see, this flow does not require b, as it is provided by the fist
task.

.. note::

   There is no difference between processing of
   :py:class:`Task <taskflow.task.Task>` and
   :py:class:`~taskflow.retry.Retry` inputs and outputs.

------------------
Engine and storage
------------------

The storage layer is how an engine persists flow and task details (for more
in-depth details see :doc:`persistence <persistence>`).

Inputs
------

As mentioned above, if some value is required by one or more tasks of a flow,
but is not provided by any task, it is considered to be flow input, and
**must** be put into the storage before the flow is run. On failure to do
so :py:class:`~taskflow.exceptions.MissingDependencies` is raised by the engine
prior to running:

.. doctest::

   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print meow
   ...     return "cat"
   ...
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print woof
   ...     return "dog"
   ...
   >>> flo = linear_flow.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   <taskflow.patterns.linear_flow.Flow object at 0x...>
   >>> engines.run(flo)
   Traceback (most recent call last):
      ...
   taskflow.exceptions.MissingDependencies: 'linear_flow.Flow: cat-dog(len=2)' requires ['meow', 'woof'] but no other entity produces said requirements
    MissingDependencies: 'execute' method on '__main__.DogTalk==1.0' requires ['woof'] but no other entity produces said requirements
    MissingDependencies: 'execute' method on '__main__.CatTalk==1.0' requires ['meow'] but no other entity produces said requirements

The recommended way to provide flow inputs is to use the ``store`` parameter
of the engine helpers (:py:func:`~taskflow.engines.helpers.run` or
:py:func:`~taskflow.engines.helpers.load`):

.. doctest::

   >>> class CatTalk(task.Task):
   ...   def execute(self, meow):
   ...     print meow
   ...     return "cat"
   ...
   >>> class DogTalk(task.Task):
   ...   def execute(self, woof):
   ...     print woof
   ...     return "dog"
   ...
   >>> flo = linear_flow.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   <taskflow.patterns.linear_flow.Flow object at 0x...>
   >>> result = engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
   meow
   woof
   >>> pprint(result)
   {'dog': 'dog', 'meow': 'meow', 'woof': 'woof'}

You can also directly interact with the engine storage layer to add additional
values, note that if this route is used you can't use the helper method
:py:func:`~taskflow.engines.helpers.run`. Instead,
you must activate the engine's run method directly
:py:func:`~taskflow.engines.base.EngineBase.run`:

.. doctest::

   >>> flo = linear_flow.Flow("cat-dog")
   >>> flo.add(CatTalk(), DogTalk(provides="dog"))
   <taskflow.patterns.linear_flow.Flow object at 0x...>
   >>> eng = engines.load(flo, store={'meow': 'meow'})
   >>> eng.storage.inject({"woof": "bark"})
   >>> eng.run()
   meow
   bark

Outputs
-------

As you can see from examples above, the run method returns all flow outputs in
a ``dict``. This same data can be fetched via
:py:meth:`~taskflow.storage.Storage.fetch_all` method of the engines storage
object. You can also get single results using the
engines storage objects :py:meth:`~taskflow.storage.Storage.fetch` method.

For example:

.. doctest::

   >>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
   >>> eng.run()
   meow
   woof
   >>> pprint(eng.storage.fetch_all())
   {'dog': 'dog', 'meow': 'meow', 'woof': 'woof'}
   >>> print(eng.storage.fetch("dog"))
   dog