summaryrefslogtreecommitdiff
path: root/doc/source/workers.rst
blob: 95c1559815a03423c8d1ff7905a0abde7454e401 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
Overview
========

This is engine that schedules tasks to **workers** -- separate processes
dedicated for certain atoms execution, possibly running on other machines,
connected via `amqp`_ (or other supported `kombu`_ transports).

.. note::

    This engine is under active development and is usable and **does** work
    but is missing some features (please check the `blueprint page`_ for
    known issues and plans) that will make it more production ready.

.. _blueprint page: https://blueprints.launchpad.net/taskflow?searchtext=wbe

Terminology
-----------

Client
  Code or program or service (or user) that uses this library to define
  flows and run them via engines.

Transport + protocol
  Mechanism (and `protocol`_ on top of that mechanism) used to pass information
  between the client and worker (for example amqp as a transport and a json
  encoded message format as the protocol).

Executor
  Part of the worker-based engine and is used to publish task requests, so
  these requests can be accepted and processed by remote workers.

Worker
  Workers are started on remote hosts and each has a list of tasks it can
  perform (on request). Workers accept and process task requests that are
  published by an executor. Several requests can be processed simultaneously
  in separate threads (or processes...). For example, an `executor`_ can be
  passed to the worker and configured to run in as many threads (green or
  not) as desired.

Proxy
  Executors interact with workers via a proxy. The proxy maintains the
  underlying transport and publishes messages (and invokes callbacks on message
  reception).

Requirements
------------

* **Transparent:** it should work as ad-hoc replacement for existing
  *(local)* engines with minimal, if any refactoring (e.g. it should be
  possible to run the same flows on it without changing client code if
  everything is set up and configured properly).
* **Transport-agnostic:** the means of transport should be abstracted so that
  we can use `oslo.messaging`_, `gearmand`_, `amqp`_, `zookeeper`_, `marconi`_,
  `websockets`_ or anything else that allows for passing information between a
  client and a worker.
* **Simple:** it should be simple to write and deploy.
* **Non-uniformity:** it should support non-uniform workers which allows
  different workers to execute different sets of atoms depending on the workers
  published capabilities.

.. _marconi: https://wiki.openstack.org/wiki/Marconi
.. _zookeeper: http://zookeeper.org/
.. _gearmand: http://gearman.org/
.. _oslo.messaging: https://wiki.openstack.org/wiki/Oslo/Messaging
.. _websockets: http://en.wikipedia.org/wiki/WebSocket
.. _amqp: http://www.amqp.org/
.. _executor: https://docs.python.org/dev/library/concurrent.futures.html#executor-objects
.. _protocol: http://en.wikipedia.org/wiki/Communications_protocol

Design
======

There are two communication sides, the *executor* (and associated engine
derivative) and *worker* that communicate using a proxy component. The proxy
is designed to accept/publish messages from/into a named exchange.

High level architecture
-----------------------

.. image:: img/worker-engine.svg
   :height: 340px
   :align: right

Executor and worker communication
---------------------------------

Let's consider how communication between an executor and a worker happens.
First of all an engine resolves all atoms dependencies and schedules atoms that
can be performed at the moment. This uses the same scheduling and dependency
resolution logic that is used for every other engine type. Then the atoms which
can be executed immediately (ones that are dependent on outputs of other tasks
will be executed when that output is ready) are executed by the worker-based
engine executor in the following manner:

1. The executor initiates task execution/reversion using a proxy object.
2. :py:class:`~taskflow.engines.worker_based.proxy.Proxy` publishes task
   request (format is described below) into a named exchange using a routing
   key that is used to deliver request to particular workers topic. The
   executor then waits for the task requests to be accepted and confirmed by
   workers. If the executor doesn't get a task confirmation from workers within
   the given timeout the task is considered as timed-out and a timeout
   exception is raised.
3. A worker receives a request message and starts a new thread for processing
   it.

   1. The worker dispatches the request (gets desired endpoint that actually
      executes the task).
   2. If dispatched succeeded then the worker sends a confirmation response
      to the executor otherwise the worker sends a failed response along with
      a serialized :py:class:`failure <taskflow.types.failure.Failure>` object
      that contains what has failed (and why).
   3. The worker executes the task and once it is finished sends the result
      back to the originating executor (every time a task progress event is
      triggered it sends progress notification to the executor where it is
      handled by the engine, dispatching to listeners and so-on).

4. The executor gets the task request confirmation from the worker and the task
   request state changes from the ``PENDING`` to the ``RUNNING`` state. Once a
   task request is in the ``RUNNING`` state it can't be timed-out (considering
   that the task execution process may take an unpredictable amount of time).
5. The executor gets the task execution result from the worker and passes it
   back to the executor and worker-based engine to finish task processing (this
   repeats for subsequent tasks).

.. note::

    :py:class:`~taskflow.types.failure.Failure` objects are not directly
    json-serializable (they contain references to tracebacks which are not
    serializable), so they are converted to dicts before sending and converted
    from dicts after receiving on both executor & worker sides (this
    translation is lossy since the traceback can't be fully retained, due
    to its contents containing internal interpreter references and
    details).

Protocol
~~~~~~~~

.. automodule:: taskflow.engines.worker_based.protocol

Examples
~~~~~~~~

Request (execute)
"""""""""""""""""

* **task_name** - full task name to be performed
* **task_cls** - full task class name to be performed
* **action** - task action to be performed (e.g. execute, revert)
* **arguments** - arguments the task action to be called with
* **result** - task execution result (result or
  :py:class:`~taskflow.types.failure.Failure`) *[passed to revert only]*

Additionally, the following parameters are added to the request message:

* **reply_to** - executor named exchange workers will send responses back to
* **correlation_id** - executor request id (since there can be multiple request
  being processed simultaneously)

**Example:**

.. code:: json

    {
        "action": "execute",
        "arguments": {
            "x": 111
        },
        "task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
        "task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
        "task_version": [
            1,
            0
        ]
    }


Request (revert)
""""""""""""""""

When **reverting:**

.. code:: json

    {
        "action": "revert",
        "arguments": {},
        "failures": {
            "taskflow.tests.utils.TaskWithFailure": {
                "exc_type_names": [
                    "RuntimeError",
                    "StandardError",
                    "Exception"
                ],
                "exception_str": "Woot!",
                "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
                "version": 1
            }
        },
        "result": [
            "failure",
            {
                "exc_type_names": [
                    "RuntimeError",
                    "StandardError",
                    "Exception"
                ],
                "exception_str": "Woot!",
                "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
                "version": 1
            }
        ],
        "task_cls": "taskflow.tests.utils.TaskWithFailure",
        "task_name": "taskflow.tests.utils.TaskWithFailure",
        "task_version": [
            1,
            0
        ]
    }

Worker response(s)
""""""""""""""""""

When **running:**

.. code:: json

    {
        "data": {},
        "state": "RUNNING"
    }

When **progressing:**

.. code:: json

    {
        "details": {
            "progress": 0.5
        },
        "event_type": "update_progress",
        "state": "EVENT"
    }

When **succeeded:**

.. code:: json

    {
        "data": {
            "result": 666
        },
        "state": "SUCCESS"
    }

When **failed:**

.. code:: json

    {
        "data": {
            "result": {
                "exc_type_names": [
                    "RuntimeError",
                    "StandardError",
                    "Exception"
                ],
                "exception_str": "Woot!",
                "traceback_str": "  File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n    result = task.execute(**arguments)\n  File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n    raise RuntimeError('Woot!')\n",
                "version": 1
            }
        },
        "state": "FAILURE"
    }

Request state transitions
-------------------------

.. image:: img/wbe_request_states.svg
   :width: 520px
   :align: center
   :alt: WBE request state transitions

**WAITING** - Request placed on queue (or other `kombu`_ message bus/transport)
but not *yet* consumed.

**PENDING** - Worker accepted request and is pending to run using its
executor (threads, processes, or other).

**FAILURE** - Worker failed after running request (due to task exeception) or
no worker moved/started executing (by placing the request into ``RUNNING``
state) with-in specified time span (this defaults to 60 seconds unless
overriden).

**RUNNING** - Workers executor (using threads, processes...) has started to
run requested task (once this state is transitioned to any request timeout no
longer becomes applicable; since at this point it is unknown how long a task
will run since it can not be determined if a task is just taking a long time
or has failed).

**SUCCESS** - Worker finished running task without exception.

.. note::

    During the ``WAITING`` and ``PENDING`` stages the engine keeps track
    of how long the request has been *alive* for and if a timeout is reached
    the request will automatically transition to ``FAILURE`` and any further
    transitions from a worker will be disallowed (for example, if a worker
    accepts the request in the future and sets the task to ``PENDING`` this
    transition will be logged and ignored). This timeout can be adjusted and/or
    removed by setting the engine ``transition_timeout`` option to a
    higher/lower value or by setting it to ``None`` (to remove the timeout
    completely). In the future this will be improved to be more dynamic
    by implementing the blueprints associated with `failover`_ and
    `info/resilence`_.

.. _failover: https://blueprints.launchpad.net/taskflow/+spec/wbe-worker-failover
.. _info/resilence: https://blueprints.launchpad.net/taskflow/+spec/wbe-worker-info

Usage
=====

Workers
-------

To use the worker based engine a set of workers must first be established on
remote machines. These workers must be provided a list of task objects, task
names, modules names (or entrypoints that can be examined for valid tasks) they
can respond to (this is done so that arbitrary code execution is not possible).

For complete parameters and object usage please visit
:py:class:`~taskflow.engines.worker_based.worker.Worker`.

**Example:**

.. code:: python

    from taskflow.engines.worker_based import worker as w

    config = {
        'url': 'amqp://guest:guest@localhost:5672//',
        'exchange': 'test-exchange',
        'topic': 'test-tasks',
        'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
    }
    worker = w.Worker(**config)
    worker.run()

Engines
-------

To use the worker based engine a flow must be constructed (which contains tasks
that are visible on remote machines) and the specific worker based engine
entrypoint must be selected. Certain configuration options must also be
provided so that the transport backend can be configured and initialized
correctly. Otherwise the usage should be mostly transparent (and is nearly
identical to using any other engine type).

For complete parameters and object usage please see
:py:class:`~taskflow.engines.worker_based.engine.WorkerBasedActionEngine`.

**Example with amqp transport:**

.. code:: python

    flow = lf.Flow('simple-linear').add(...)
    eng = taskflow.engines.load(flow, engine='worker-based',
                                url='amqp://guest:guest@localhost:5672//',
                                exchange='test-exchange',
                                topics=['topic1', 'topic2'])
    eng.run()

**Example with filesystem transport:**

.. code:: python

    flow = lf.Flow('simple-linear').add(...)
    eng = taskflow.engines.load(flow, engine='worker-based',
                                exchange='test-exchange',
                                topics=['topic1', 'topic2'],
                                transport='filesystem',
                                transport_options={
                                    'data_folder_in': '/tmp/in',
                                    'data_folder_out': '/tmp/out',
                                })
    eng.run()

Additional supported keyword arguments:

* ``executor``: a class that provides a
  :py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor`
  interface; it will be used for executing, reverting and waiting for remote
  tasks.

Limitations
===========

* Atoms inside a flow must receive and accept parameters only from the ways
  defined in :doc:`persistence <persistence>`. In other words, the task
  that is created when a workflow is constructed will not be the same task that
  is executed on a remote worker (and any internal state not passed via the
  :doc:`input and output <inputs_and_outputs>` mechanism can not be
  transferred). This means resource objects (database handles, file
  descriptors, sockets, ...) can **not** be directly sent across to remote
  workers (instead the configuration that defines how to fetch/create these
  objects must be instead).
* Worker-based engines will in the future be able to run lightweight tasks
  locally to avoid transport overhead for very simple tasks (currently it will
  run even lightweight tasks remotely, which may be non-performant).
* Fault detection, currently when a worker acknowledges a task the engine will
  wait for the task result indefinitely (a task may take an indeterminate
  amount of time to finish). In the future there needs to be a way to limit
  the duration of a remote workers execution (and track their liveness) and
  possibly spawn the task on a secondary worker if a timeout is reached (aka
  the first worker has died or has stopped responding).

Implementations
===============

.. automodule:: taskflow.engines.worker_based.engine

Components
----------

.. automodule:: taskflow.engines.worker_based.proxy
.. automodule:: taskflow.engines.worker_based.worker

.. _kombu: http://kombu.readthedocs.org/